From 913f11af48ef76eae767a90aa1b45c8049e70cc7 Mon Sep 17 00:00:00 2001
From: Felix Reichenbach <felix.reichenbach@mgm-tp.com>
Date: Wed, 19 Mar 2025 09:57:30 +0100
Subject: [PATCH 1/2] OZG-7573 allow multi file upload by not closing
 requestObserver on EOF

---
 .gitignore                                    |   1 +
 .../binaryfile/GrpcFileUploadUtils.java       |  26 +++-
 .../binaryfile/GrpcFileUploadUtilsTest.java   | 129 ++++++++++++++++++
 3 files changed, 152 insertions(+), 4 deletions(-)

diff --git a/.gitignore b/.gitignore
index c10edf4..bc1ffde 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,3 +10,4 @@ target
 
 .idea
 *.iml
+.vscode/settings.json
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index e7aae2e..783d965 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -55,7 +55,12 @@ public class GrpcFileUploadUtils {
 	 */
 	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
-		return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream);
+		return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
+	}
+
+	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
+		return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
 	}
 
 	public static class FileSender<Q, S> {
@@ -72,12 +77,19 @@ public class GrpcFileUploadUtils {
 		private final AtomicBoolean done = new AtomicBoolean(false);
 
 		private final StreamReader streamReader;
+		private final boolean completeOnFileSent;
 
 		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
 				InputStream inputStream) {
+			this(chunkBuilder, reqObserverBuilder, inputStream, true);
+		}
+
+		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
+				InputStream inputStream, boolean completeOnFileSent) {
 			this.chunkBuilder = chunkBuilder;
 			this.inputStream = inputStream;
 			this.reqObserverBuilder = reqObserverBuilder;
+			this.completeOnFileSent = completeOnFileSent;
 
 			this.streamReader = new StreamReader(this.inputStream);
 		}
@@ -152,16 +164,22 @@ public class GrpcFileUploadUtils {
 		}
 
 		private void endTransfer() {
-			requestObserver.onCompleted();
+			if (completeOnFileSent)
+				requestObserver.onCompleted();
+			else
+				sendEndOfFile();
 			done.set(true);
 			LOG.debug("File Transfer done.");
 			closeStreams();
 
 		}
 
-		private void closeStreams() {
+		private void sendEndOfFile() {
+			sendChunk(new byte[0], streamReader.getLastReadSize());
+		}
+
+		void closeStreams() {
 			LOG.debug("Closing streams");
-			IOUtils.closeQuietly(inputStream);
 			streamReader.close();
 		}
 
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
index 83e18f8..6a01183 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
@@ -27,11 +27,13 @@ import static org.assertj.core.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
@@ -47,6 +49,7 @@ import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
 import de.ozgcloud.common.errorhandling.TechnicalException;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
+import lombok.SneakyThrows;
 
 class GrpcFileUploadUtilsTest {
 
@@ -122,6 +125,132 @@ class GrpcFileUploadUtilsTest {
 
 	}
 
+	@Nested
+	class TestSendNextChunk {
+
+		private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
+		private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
+
+		@Captor
+		private ArgumentCaptor<byte[]> chunkCaptor;
+
+		@Nested
+		class TestOnDataAvailable {
+			@BeforeEach
+			void initObserver() {
+				fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
+				fileSender.send();
+			}
+
+			@Test
+			void shouldCallSendChunk() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
+				assertThat(chunkCaptor.getValue()).contains(content);
+			}
+
+			@Test
+			void shouldReturnChunkSize() {
+				var cotnentLength = fileSender.sendNextChunk();
+
+				assertThat(cotnentLength).isEqualTo(GrpcFileUploadUtils.CHUNK_SIZE);
+			}
+		}
+
+		@Nested
+		class TestOnNoBytesLeftToRead {
+
+			@Nested
+			class TestOnCompleteOnFileSent {
+				private static final boolean COMPLETE_ON_FILE_SENT = true;
+
+				@BeforeEach
+				void initialize() {
+					var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
+					byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
+					fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
+					fileSender.send();
+				}
+
+				@Test
+				void shouldCallOnCompleted() {
+
+					fileSender.sendNextChunk();
+
+					verify(requestObserver).onCompleted();
+				}
+
+				@Test
+				void shouldNotCallSendChunk() {
+					fileSender.sendNextChunk();
+
+					verify(fileSender, never()).sendChunk(any(), anyInt());
+				}
+
+				@Test
+				@SneakyThrows
+				void shouldCallCloseStreams() {
+					fileSender.sendNextChunk();
+
+					verify(fileSender).closeStreams();
+				}
+			}
+
+			@Nested
+			class TestOnNotCompleteOnFileSent {
+				private static final boolean COMPLETE_ON_FILE_SENT = false;
+
+				@BeforeEach
+				void initialize() {
+					var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
+					byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
+					fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
+					fileSender.send();
+				}
+
+				@Test
+				void shouldNotCallOnCompleted() {
+
+					fileSender.sendNextChunk();
+
+					verify(requestObserver, never()).onCompleted();
+				}
+
+				@Test
+				void shouldCallSendChunk() {
+					fileSender.sendNextChunk();
+
+					verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
+					assertThat(chunkCaptor.getValue()).isEmpty();
+				}
+
+				@Test
+				@SneakyThrows
+				void shouldCallCloseStreams() {
+					fileSender.sendNextChunk();
+
+					verify(fileSender).closeStreams();
+				}
+			}
+		}
+
+	}
+
+	@Nested
+	class TestCloseStreams {
+
+		@Test
+		@SneakyThrows
+		void shouldCloseInputStream() {
+			fileSender.send();
+
+			fileSender.closeStreams();
+
+			verify(inputStream).close();
+		}
+	}
+
 	@Nested
 	class TestSendChunk {
 
-- 
GitLab


From 6f11b70fcc5ae0f423c32de5836a7b77a8408372 Mon Sep 17 00:00:00 2001
From: Felix Reichenbach <felix.reichenbach@mgm-tp.com>
Date: Thu, 20 Mar 2025 13:53:49 +0100
Subject: [PATCH 2/2] OZG-7573 apply code review

---
 .../ozgcloud/common/binaryfile/GrpcFileUploadUtils.java  | 3 +--
 .../common/binaryfile/GrpcFileUploadUtilsTest.java       | 9 ---------
 2 files changed, 1 insertion(+), 11 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index 783d965..1cbd50d 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -152,7 +152,7 @@ public class GrpcFileUploadUtils {
 
 		}
 
-		long sendNextChunk() {
+		void sendNextChunk() {
 			byte[] contentToSend = streamReader.getNextData();
 
 			if (streamReader.getLastReadSize() > 0) {
@@ -160,7 +160,6 @@ public class GrpcFileUploadUtils {
 			} else {
 				endTransfer();
 			}
-			return contentToSend.length;
 		}
 
 		private void endTransfer() {
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
index 6a01183..fb77f3c 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
@@ -149,13 +149,6 @@ class GrpcFileUploadUtilsTest {
 				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
 				assertThat(chunkCaptor.getValue()).contains(content);
 			}
-
-			@Test
-			void shouldReturnChunkSize() {
-				var cotnentLength = fileSender.sendNextChunk();
-
-				assertThat(cotnentLength).isEqualTo(GrpcFileUploadUtils.CHUNK_SIZE);
-			}
 		}
 
 		@Nested
@@ -175,7 +168,6 @@ class GrpcFileUploadUtilsTest {
 
 				@Test
 				void shouldCallOnCompleted() {
-
 					fileSender.sendNextChunk();
 
 					verify(requestObserver).onCompleted();
@@ -211,7 +203,6 @@ class GrpcFileUploadUtilsTest {
 
 				@Test
 				void shouldNotCallOnCompleted() {
-
 					fileSender.sendNextChunk();
 
 					verify(requestObserver, never()).onCompleted();
-- 
GitLab