From 913f11af48ef76eae767a90aa1b45c8049e70cc7 Mon Sep 17 00:00:00 2001 From: Felix Reichenbach <felix.reichenbach@mgm-tp.com> Date: Wed, 19 Mar 2025 09:57:30 +0100 Subject: [PATCH 1/2] OZG-7573 allow multi file upload by not closing requestObserver on EOF --- .gitignore | 1 + .../binaryfile/GrpcFileUploadUtils.java | 26 +++- .../binaryfile/GrpcFileUploadUtilsTest.java | 129 ++++++++++++++++++ 3 files changed, 152 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index c10edf4..bc1ffde 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target .idea *.iml +.vscode/settings.json diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index e7aae2e..783d965 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -55,7 +55,12 @@ public class GrpcFileUploadUtils { */ public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { - return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream); + return createSender(chunkBuilder, inputStream, reqObserverBuilder, true); + } + + public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) { + return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent); } public static class FileSender<Q, S> { @@ -72,12 +77,19 @@ public class GrpcFileUploadUtils { private final AtomicBoolean done = new AtomicBoolean(false); private final StreamReader streamReader; + private final boolean completeOnFileSent; FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, InputStream inputStream) { + this(chunkBuilder, reqObserverBuilder, inputStream, true); + } + + FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, + InputStream inputStream, boolean completeOnFileSent) { this.chunkBuilder = chunkBuilder; this.inputStream = inputStream; this.reqObserverBuilder = reqObserverBuilder; + this.completeOnFileSent = completeOnFileSent; this.streamReader = new StreamReader(this.inputStream); } @@ -152,16 +164,22 @@ public class GrpcFileUploadUtils { } private void endTransfer() { - requestObserver.onCompleted(); + if (completeOnFileSent) + requestObserver.onCompleted(); + else + sendEndOfFile(); done.set(true); LOG.debug("File Transfer done."); closeStreams(); } - private void closeStreams() { + private void sendEndOfFile() { + sendChunk(new byte[0], streamReader.getLastReadSize()); + } + + void closeStreams() { LOG.debug("Closing streams"); - IOUtils.closeQuietly(inputStream); streamReader.close(); } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java index 83e18f8..6a01183 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java @@ -27,11 +27,13 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.function.BiFunction; import java.util.function.Function; +import org.apache.commons.lang3.RandomUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; @@ -47,6 +49,7 @@ import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; import de.ozgcloud.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; +import lombok.SneakyThrows; class GrpcFileUploadUtilsTest { @@ -122,6 +125,132 @@ class GrpcFileUploadUtilsTest { } + @Nested + class TestSendNextChunk { + + private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2); + private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content); + + @Captor + private ArgumentCaptor<byte[]> chunkCaptor; + + @Nested + class TestOnDataAvailable { + @BeforeEach + void initObserver() { + fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder)); + fileSender.send(); + } + + @Test + void shouldCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); + assertThat(chunkCaptor.getValue()).contains(content); + } + + @Test + void shouldReturnChunkSize() { + var cotnentLength = fileSender.sendNextChunk(); + + assertThat(cotnentLength).isEqualTo(GrpcFileUploadUtils.CHUNK_SIZE); + } + } + + @Nested + class TestOnNoBytesLeftToRead { + + @Nested + class TestOnCompleteOnFileSent { + private static final boolean COMPLETE_ON_FILE_SENT = true; + + @BeforeEach + void initialize() { + var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; + byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); + fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); + fileSender.send(); + } + + @Test + void shouldCallOnCompleted() { + + fileSender.sendNextChunk(); + + verify(requestObserver).onCompleted(); + } + + @Test + void shouldNotCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender, never()).sendChunk(any(), anyInt()); + } + + @Test + @SneakyThrows + void shouldCallCloseStreams() { + fileSender.sendNextChunk(); + + verify(fileSender).closeStreams(); + } + } + + @Nested + class TestOnNotCompleteOnFileSent { + private static final boolean COMPLETE_ON_FILE_SENT = false; + + @BeforeEach + void initialize() { + var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; + byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); + fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); + fileSender.send(); + } + + @Test + void shouldNotCallOnCompleted() { + + fileSender.sendNextChunk(); + + verify(requestObserver, never()).onCompleted(); + } + + @Test + void shouldCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1)); + assertThat(chunkCaptor.getValue()).isEmpty(); + } + + @Test + @SneakyThrows + void shouldCallCloseStreams() { + fileSender.sendNextChunk(); + + verify(fileSender).closeStreams(); + } + } + } + + } + + @Nested + class TestCloseStreams { + + @Test + @SneakyThrows + void shouldCloseInputStream() { + fileSender.send(); + + fileSender.closeStreams(); + + verify(inputStream).close(); + } + } + @Nested class TestSendChunk { -- GitLab From 6f11b70fcc5ae0f423c32de5836a7b77a8408372 Mon Sep 17 00:00:00 2001 From: Felix Reichenbach <felix.reichenbach@mgm-tp.com> Date: Thu, 20 Mar 2025 13:53:49 +0100 Subject: [PATCH 2/2] OZG-7573 apply code review --- .../ozgcloud/common/binaryfile/GrpcFileUploadUtils.java | 3 +-- .../common/binaryfile/GrpcFileUploadUtilsTest.java | 9 --------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index 783d965..1cbd50d 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -152,7 +152,7 @@ public class GrpcFileUploadUtils { } - long sendNextChunk() { + void sendNextChunk() { byte[] contentToSend = streamReader.getNextData(); if (streamReader.getLastReadSize() > 0) { @@ -160,7 +160,6 @@ public class GrpcFileUploadUtils { } else { endTransfer(); } - return contentToSend.length; } private void endTransfer() { diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java index 6a01183..fb77f3c 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java @@ -149,13 +149,6 @@ class GrpcFileUploadUtilsTest { verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); assertThat(chunkCaptor.getValue()).contains(content); } - - @Test - void shouldReturnChunkSize() { - var cotnentLength = fileSender.sendNextChunk(); - - assertThat(cotnentLength).isEqualTo(GrpcFileUploadUtils.CHUNK_SIZE); - } } @Nested @@ -175,7 +168,6 @@ class GrpcFileUploadUtilsTest { @Test void shouldCallOnCompleted() { - fileSender.sendNextChunk(); verify(requestObserver).onCompleted(); @@ -211,7 +203,6 @@ class GrpcFileUploadUtilsTest { @Test void shouldNotCallOnCompleted() { - fileSender.sendNextChunk(); verify(requestObserver, never()).onCompleted(); -- GitLab