diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 70020e179a1c53540f64fb011255f4eb3e91f633..3128ba021a0f4c30f568285842fe47ee41266fa6 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -23,15 +23,6 @@ */ package de.ozgcloud.common.binaryfile; -import com.google.protobuf.ByteString; -import de.ozgcloud.common.errorhandling.TechnicalException; -import io.grpc.Context; -import io.grpc.stub.CallStreamObserver; -import lombok.Builder; -import lombok.extern.log4j.Log4j2; -import org.apache.commons.io.IOUtils; -import org.springframework.core.task.TaskExecutor; - import java.io.IOException; import java.io.OutputStream; import java.io.PipedInputStream; @@ -40,6 +31,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.commons.io.IOUtils; +import org.springframework.core.task.TaskExecutor; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.Context; +import io.grpc.stub.CallStreamObserver; +import lombok.Builder; +import lombok.extern.log4j.Log4j2; + @Log4j2 public class GrpcBinaryFileServerDownloader<T> { @@ -52,7 +54,6 @@ public class GrpcBinaryFileServerDownloader<T> { private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE]; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean downloadInProgress = new AtomicBoolean(false); private PipedInputStream inputStream; private PipedOutputStream outputStream; @@ -79,7 +80,7 @@ public class GrpcBinaryFileServerDownloader<T> { void doStart() { handleSafety(this::setupStreams); taskExecutor.execute(Context.current().wrap(this::startDownload)); - callObserver.setOnReadyHandler(this::onReadyHandler); + sendChunks(); } void setupStreams() throws IOException { @@ -93,31 +94,21 @@ public class GrpcBinaryFileServerDownloader<T> { } void doDownload() throws IOException { - downloadInProgress.set(true); downloadConsumer.accept(outputStream); - downloadInProgress.set(false); outputStream.close(); } - synchronized void onReadyHandler() { - if (callObserver.isReady()) { - sendChunks(); - } - } - void sendChunks() { handleSafety(this::doSendChunks); } void doSendChunks() throws IOException { int bytesRead; - while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { + while ((bytesRead = inputStream.read(buffer)) != -1) { callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); } - if (!downloadInProgress.get()) { - inputStream.close(); - callObserver.onCompleted(); - } + inputStream.close(); + callObserver.onCompleted(); } void handleSafety(ExceptionalRunnable runnable) {