diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index bd2e1af28ab65d530f37fcf10e6be9b35282aad3..ff39dacb3fc868ab6efb03ebcedd823e33da7c22 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -27,12 +27,14 @@ import java.io.IOException; import java.io.InputStream; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Function; import org.apache.commons.io.IOUtils; +import org.slf4j.MDC; import de.ozgcloud.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; @@ -62,6 +64,7 @@ public class GrpcFileUploadUtils { private final BiFunction<byte[], Integer, Q> chunkBuilder; private final InputStream inputStream; + private final String fileSenderId = UUID.randomUUID().toString(); @Getter private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; @@ -88,7 +91,7 @@ public class GrpcFileUploadUtils { } public FileSender<Q, S> send() { - LOG.debug("Start sending File."); + LOG.debug("Start sending File. (fileSenderId = {})", fileSenderId); var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext); requestObserver = reqObserverBuilder.apply(responseObserver); @@ -96,14 +99,14 @@ public class GrpcFileUploadUtils { } public void cancelOnTimeout() { - LOG.warn("File transfer canceled on timeout"); + LOG.warn("File transfer canceled on timeout (fileSenderId = {})", fileSenderId); resultFuture.cancel(true); requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); closeStreams(); } public void cancelOnError(Throwable t) { - LOG.error("File transfer canceled on error.", t); + LOG.error("File transfer canceled on error. (fileSenderId = {})", fileSenderId, t); resultFuture.cancel(true); requestObserver.onError(t); closeStreams(); @@ -113,27 +116,26 @@ public class GrpcFileUploadUtils { if (done.get()) { return; } - waitForOberver(); + waitForObserver(); sendMetaData(); while (!done.get() && isReady()) { - LOG.debug("Sending next chunk"); sendNextChunk(); } - LOG.debug("Finished or waiting to become ready."); + LOG.debug("Finished or waiting to become ready. (fileSenderId = {})", fileSenderId); } boolean isReady() { return requestObserver.isReady(); } - private void waitForOberver() { + private void waitForObserver() { synchronized (this) { while (Objects.isNull(requestObserver)) { try { - LOG.debug("wait for observer"); + LOG.debug("wait for observer (fileSenderId = {})", fileSenderId); wait(300); } catch (InterruptedException e) { - LOG.error("Error on waiting for request Observer.", e); + LOG.error("Error on waiting for request Observer. (fileSenderId = {})", fileSenderId, e); Thread.currentThread().interrupt(); } } @@ -155,19 +157,19 @@ public class GrpcFileUploadUtils { private void endTransfer() { requestObserver.onCompleted(); done.set(true); - LOG.debug("File Transfer done."); + LOG.debug("File Transfer done. (fileSenderId = {})", fileSenderId); closeStreams(); - + MDC.remove("fileSenderId"); } private void closeStreams() { - LOG.debug("Closing streams"); + LOG.debug("Closing streams (fileSenderId = {})", fileSenderId); IOUtils.closeQuietly(inputStream); streamReader.close(); } void sendChunk(byte[] content, int length) { - LOG.debug("Sending {} bytes.", length); + LOG.trace("Sending {} bytes. (fileSenderId = {})", length, fileSenderId); var chunk = chunkBuilder.apply(content, length); requestObserver.onNext(chunk); } @@ -185,19 +187,19 @@ public class GrpcFileUploadUtils { } private void doSendMetaData(Q metadata) { - LOG.debug("Sending Metadata."); + LOG.debug("Sending Metadata. (fileSenderId = {})", fileSenderId); requestObserver.onNext(metadata); metaDataSent.set(true); } void checkForEndOfStream(long sentSize) { if (sentSize < CHUNK_SIZE) { - LOG.debug("File Transfer done. Closing stream."); + LOG.debug("File Transfer done. Closing stream. (fileSenderId = {})", fileSenderId); IOUtils.closeQuietly(inputStream); requestObserver.onCompleted(); done.set(true); } else { - LOG.debug("File Transfer not jet done - need to tranfer another chunk."); + LOG.debug("File Transfer not jet done - need to tranfer another chunk. (fileSenderId = {})", fileSenderId); } }