Skip to content
Snippets Groups Projects

Ozg 7262 fix unfinished downloads

Merged Krzysztof Witukiewicz requested to merge OZG-7262-fix-unfinished-downloads into main
All threads resolved!
3 files
+ 90
87
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -48,8 +48,8 @@ public class GrpcBinaryFileServerDownloader<T> {
static final int CHUNK_SIZE = 255 * 1024;
private static final int END_OF_STREAM = -1;
private static final int NOTHING_READ = 0;
static final int END_OF_STREAM = -1;
static final int NOTHING_READ = 0;
private final CallStreamObserver<T> callObserver;
private final Function<ByteString, T> chunkBuilder;
@@ -126,7 +126,7 @@ public class GrpcBinaryFileServerDownloader<T> {
try {
doSendChunks();
} catch (Exception e) {
completeRequestWithError(new TechnicalException("Error while sending chunks", e));
handleError(new TechnicalException("Error while sending chunks", e));
}
}
@@ -144,36 +144,37 @@ public class GrpcBinaryFileServerDownloader<T> {
var bytesRead = inputStream.read(buffer);
switch (bytesRead) {
case END_OF_STREAM:
completeRequest();
finishProcessing();
break;
case NOTHING_READ:
break;
default:
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
LOG.debug("Sent {} bytes", bytesRead);
sendBytesToCallObserver(bytesRead);
}
}
void completeRequest() {
void sendBytesToCallObserver(int bytesRead) {
var bytes = ByteString.copyFrom(buffer, 0, bytesRead);
var chunk = chunkBuilder.apply(bytes);
callObserver.onNext(chunk);
LOG.debug("Sent {} bytes", bytesRead);
}
void finishProcessing() {
if (Objects.nonNull(downloadError.get())) {
throw downloadError.get();
} else {
completeRequestNormally();
finishRequest();
callObserver.onCompleted();
}
}
void completeRequestWithError(TechnicalException e) {
void handleError(TechnicalException e) {
LOG.debug("Complete download request with error");
finishRequest();
throw e;
}
void completeRequestNormally() {
LOG.debug("Complete download request");
finishRequest();
callObserver.onCompleted();
}
private void finishRequest() {
requestFinished.set(true);
closeInputStream();
Loading