From 136822b7f04d68783f504adadb8e9473033505bb Mon Sep 17 00:00:00 2001 From: OZGCloud <ozgcloud@mgm-tp.com> Date: Mon, 18 Nov 2024 14:11:29 +0100 Subject: [PATCH] OZG-7143 add debug logs --- .../GrpcBinaryFileServerDownloader.java | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 3128ba0..9056318 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -23,6 +23,15 @@ */ package de.ozgcloud.common.binaryfile; +import com.google.protobuf.ByteString; +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.Context; +import io.grpc.stub.CallStreamObserver; +import lombok.Builder; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.io.IOUtils; +import org.springframework.core.task.TaskExecutor; + import java.io.IOException; import java.io.OutputStream; import java.io.PipedInputStream; @@ -31,17 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.commons.io.IOUtils; -import org.springframework.core.task.TaskExecutor; - -import com.google.protobuf.ByteString; - -import de.ozgcloud.common.errorhandling.TechnicalException; -import io.grpc.Context; -import io.grpc.stub.CallStreamObserver; -import lombok.Builder; -import lombok.extern.log4j.Log4j2; - @Log4j2 public class GrpcBinaryFileServerDownloader<T> { @@ -54,6 +52,7 @@ public class GrpcBinaryFileServerDownloader<T> { private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE]; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean downloadInProgress = new AtomicBoolean(false); private PipedInputStream inputStream; private PipedOutputStream outputStream; @@ -78,9 +77,10 @@ public class GrpcBinaryFileServerDownloader<T> { } void doStart() { + LOG.debug("[{}] Starting download.", Thread.currentThread().getName()); handleSafety(this::setupStreams); taskExecutor.execute(Context.current().wrap(this::startDownload)); - sendChunks(); + callObserver.setOnReadyHandler(this::onReadyHandler); } void setupStreams() throws IOException { @@ -94,21 +94,36 @@ public class GrpcBinaryFileServerDownloader<T> { } void doDownload() throws IOException { + downloadInProgress.set(true); + LOG.debug("[{}] Downloading file content.", Thread.currentThread().getName()); downloadConsumer.accept(outputStream); + LOG.debug("[{}] Downloading file content finished.", Thread.currentThread().getName()); + downloadInProgress.set(false); outputStream.close(); } + synchronized void onReadyHandler() { + if (callObserver.isReady()) { + sendChunks(); + } + } + void sendChunks() { handleSafety(this::doSendChunks); } void doSendChunks() throws IOException { - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { + int bytesRead = 0; + while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); } - inputStream.close(); - callObserver.onCompleted(); + LOG.debug("[{}] Sending file content finished. isReady: {}, bytesRead: {}", Thread.currentThread().getName(), callObserver.isReady(), + bytesRead); + if (!downloadInProgress.get()) { + inputStream.close(); + LOG.debug("[{}] Complete request.", Thread.currentThread().getName()); + callObserver.onCompleted(); + } } void handleSafety(ExceptionalRunnable runnable) { -- GitLab