From 133aa19b29b5464ade61fe480a63de94e050665f Mon Sep 17 00:00:00 2001
From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com>
Date: Fri, 7 Feb 2025 15:40:39 +0100
Subject: [PATCH 1/3] OZG-7262 OZG-7680 Fix onReadyHandler not finishing

---
 .../GrpcBinaryFileServerDownloader.java       |  42 +--
 .../GrpcBinaryFileServerDownloaderITCase.java | 185 +++++++++++
 .../GrpcBinaryFileServerDownloaderTest.java   | 287 +++++++++++-------
 3 files changed, 393 insertions(+), 121 deletions(-)
 create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
index 07379d8..7326002 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
@@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2;
 @Log4j2
 public class GrpcBinaryFileServerDownloader<T> {
 
-	private static final int CHUNK_SIZE = 255 * 1024;
+	static final int CHUNK_SIZE = 255 * 1024;
+
+	private static final int END_OF_STREAM = -1;
+	private static final int NOTHING_READ = 0;
 
 	private final CallStreamObserver<T> callObserver;
 	private final Function<ByteString, T> chunkBuilder;
@@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> {
 
 	private final byte[] buffer = new byte[CHUNK_SIZE];
 	private final AtomicBoolean started = new AtomicBoolean(false);
-	private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
 	private final AtomicBoolean requestFinished = new AtomicBoolean(false);
 	private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>();
 
@@ -118,7 +120,6 @@ public class GrpcBinaryFileServerDownloader<T> {
 		LOG.debug("Downloading file content...");
 		downloadConsumer.accept(outputStream);
 		LOG.debug("Download completed.");
-		downloadFinished.set(true);
 	}
 
 	synchronized void sendChunks() {
@@ -130,28 +131,33 @@ public class GrpcBinaryFileServerDownloader<T> {
 	}
 
 	void doSendChunks() throws IOException {
-		if (requestFinished.get()) {
-			return;
-		}
-		int bytesRead;
-		while (isReady()) {
-			if ((bytesRead = inputStream.read(buffer)) == -1) {
-				tryCompleteRequest();
-				break;
-			}
-			callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
-			LOG.debug("Sent {} bytes", bytesRead);
+		while (canSendChunks()) {
+			processDataFromInputStream();
 		}
 	}
 
-	private boolean isReady() {
-		return callObserver.isReady();
+	boolean canSendChunks() {
+		return !requestFinished.get() && callObserver.isReady();
+	}
+
+	void processDataFromInputStream() throws IOException {
+		var bytesRead = inputStream.read(buffer);
+		switch (bytesRead) {
+			case END_OF_STREAM:
+				completeRequest();
+				break;
+			case NOTHING_READ:
+				break;
+			default:
+				callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
+				LOG.debug("Sent {} bytes", bytesRead);
+		}
 	}
 
-	void tryCompleteRequest() {
+	void completeRequest() {
 		if (Objects.nonNull(downloadError.get())) {
 			throw downloadError.get();
-		} else if (downloadFinished.get()) {
+		} else {
 			completeRequestNormally();
 		}
 	}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
new file mode 100644
index 0000000..039f873
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+
+import com.google.protobuf.ByteString;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import lombok.SneakyThrows;
+
+class GrpcBinaryFileServerDownloaderITCase {
+
+	@Mock
+	private CallStreamObserver<GrpcResponseDummy> callObserver;
+	private SimpleAsyncTaskExecutor taskExecutor;
+	private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader;
+
+	@BeforeEach
+	void init() {
+		taskExecutor = new SimpleAsyncTaskExecutor();
+		when(callObserver.isReady()).thenReturn(true);
+	}
+
+	@AfterEach
+	void cleanup() {
+		taskExecutor.close();
+	}
+
+	@Nested
+	class OnNoError {
+
+		private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue();
+		@Captor
+		private ArgumentCaptor<GrpcResponseDummy> captor;
+
+		@BeforeEach
+		void init() {
+			downloader = spy(downloaderBuilder()
+					.downloadConsumer(this::downloadData)
+					.build());
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldCallOnCompleted() {
+			start();
+
+			verify(callObserver).onCompleted();
+		}
+
+		@Test
+		void shouldReadAllData() {
+			start();
+
+			verify(callObserver, times(2)).onNext(captor.capture());
+			var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum();
+			assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			start();
+
+			verify(downloader).closeInputStream();
+			verify(downloader).closeOutputStream();
+		}
+
+		@Test
+		void shouldCompleteIfDownloadConsumerClosedOutputStream() {
+			downloader = spy(downloaderBuilder()
+					.downloadConsumer(this::downloadDataAndCloseStream)
+					.build());
+
+			start();
+
+			verify(callObserver).onCompleted();
+		}
+
+		@SneakyThrows
+		private void downloadData(OutputStream outputStream) {
+			byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH];
+			Arrays.fill(bytes, (byte) 1);
+			outputStream.write(bytes);
+		}
+
+		@SneakyThrows
+		private void downloadDataAndCloseStream(OutputStream outputStream) {
+			downloadData(outputStream);
+			outputStream.close();
+			Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns
+		}
+	}
+
+	@Nested
+	class OnError {
+
+		private final Throwable error = new TechnicalException("error");
+
+		@BeforeEach
+		void init() {
+			downloader = spy(downloaderBuilder()
+					.downloadConsumer(this::downloadData)
+					.build());
+		}
+
+		@Test
+		void shouldThrowException() {
+			assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class);
+		}
+
+		@Test
+		void shouldNotCallOnCompleted() {
+			catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
+
+			verify(callObserver, never()).onCompleted();
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
+
+			verify(downloader).closeInputStream();
+			verify(downloader).closeOutputStream();
+		}
+
+		@SneakyThrows
+		private void downloadData(OutputStream outputStream) {
+			throw error;
+		}
+	}
+
+	private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() {
+		return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder()
+				.taskExecutor(taskExecutor)
+				.callObserver(callObserver)
+				.chunkBuilder(this::buildChunk);
+	}
+
+	private GrpcResponseDummy buildChunk(ByteString data) {
+		return new GrpcResponseDummy(data.size());
+	}
+
+	private void start() {
+		downloader.start();
+		downloader.sendChunks();
+	}
+
+	private record GrpcResponseDummy(int bytesRead) {}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
index f2f2ef4..e2f860e 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
@@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -37,11 +38,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
@@ -300,18 +305,6 @@ class GrpcBinaryFileServerDownloaderTest {
 
 			verify(downloadConsumer).accept(outputStream);
 		}
-
-		@Test
-		void shouldDownloadFinishedBeInitiallyFalse() {
-			assertThat(getDownloadFinished()).isFalse();
-		}
-
-		@Test
-		void shouldSetDownloadFinished() {
-			downloader.doDownload();
-
-			assertThat(getDownloadFinished()).isTrue();
-		}
 	}
 
 	@Nested
@@ -362,103 +355,157 @@ class GrpcBinaryFileServerDownloaderTest {
 	@Nested
 	class TestDoSendChunks {
 
+		@SneakyThrows
+		@Test
+		void shouldCheckIfCanSendChunks() {
+			doReturn(false).when(downloader).canSendChunks();
+
+			downloader.doSendChunks();
+
+			verify(downloader).canSendChunks();
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldNotProcessIfCannotSendChunks() {
+			doReturn(false).when(downloader).canSendChunks();
+
+			downloader.doSendChunks();
+
+			verify(downloader, never()).processDataFromInputStream();
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldProcessAsLongAsCanSendChunks() {
+			doReturn(true, true, false).when(downloader).canSendChunks();
+			doNothing().when(downloader).processDataFromInputStream();
+
+			downloader.doSendChunks();
+
+			verify(downloader, times(2)).processDataFromInputStream();
+		}
+	}
+
+	@Nested
+	class TestProcessDataFromInputStream {
+
+		@Mock
+		private PipedInputStream inputStream;
+
 		@Nested
-		class OnRequestFinished {
+		class OnEndOfStreamReached {
 
+			@SneakyThrows
 			@BeforeEach
 			void init() {
-				setRequestFinishedField(true);
+				doNothing().when(downloader).completeRequest();
+				when(inputStream.read(any())).thenReturn(-1);
+				setInputStreamField(inputStream);
 			}
 
+			@SneakyThrows
 			@Test
-			void shouldNotInteractWithCallObserver() {
-				doSendChunks();
+			void shouldCompleteRequest() {
+				downloader.processDataFromInputStream();
+
+				verify(downloader).completeRequest();
+			}
+
+			@SneakyThrows
+			@Test
+			void shouldNotCallCallObserver() {
+				downloader.processDataFromInputStream();
 
 				verifyNoInteractions(callObserver);
 			}
 		}
 
 		@Nested
-		class OnRequestNotFinished {
+		class OnNoBytesWereReceived {
 
-			@Nested
-			class OnNotReady {
-
-				@BeforeEach
-				void init() {
-					when(callObserver.isReady()).thenReturn(false);
-				}
+			@SneakyThrows
+			@BeforeEach
+			void init() {
+				when(inputStream.read(any())).thenReturn(0);
+				setInputStreamField(inputStream);
+			}
 
-				@Test
-				void shouldOnlyCallIsReadyOnObserver() {
-					doSendChunks();
+			@SneakyThrows
+			@Test
+			void shouldNotCallCallObserver() {
+				downloader.processDataFromInputStream();
 
-					verify(callObserver).isReady();
-					verifyNoMoreInteractions(callObserver);
-				}
+				verifyNoInteractions(callObserver);
 			}
+		}
 
-			@Nested
-			class OnReady {
+		@Nested
+		class OnBytesWereReceived {
 
-				@Mock
-				private PipedInputStream inputStream;
-				@Captor
-				private ArgumentCaptor<ByteString> byteStringCaptor;
+			@Captor
+			private ArgumentCaptor<ByteString> byteStringCaptor;
 
-				private final int readBytes = 20;
-				private final byte[] buffer = new byte[readBytes];
-				private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
+			private final int readBytes = 20;
+			private final byte[] buffer = new byte[readBytes];
+			private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
 
-				@SneakyThrows
-				@BeforeEach
-				void mock() {
-					doNothing().when(downloader).tryCompleteRequest();
-					when(callObserver.isReady()).thenReturn(true);
-					when(inputStream.read(any())).thenReturn(readBytes, -1);
-					setInputStreamField(inputStream);
-					new Random().nextBytes(buffer);
-					ReflectionTestUtils.setField(downloader, "buffer", buffer);
-				}
+			@SneakyThrows
+			@BeforeEach
+			void mock() {
+				when(inputStream.read(any())).thenReturn(readBytes);
+				setInputStreamField(inputStream);
+				new Random().nextBytes(buffer);
+				ReflectionTestUtils.setField(downloader, "buffer", buffer);
+			}
 
-				@Test
-				void shouldCallChunkBuilder() {
-					doSendChunks();
+			@SneakyThrows
+			@Test
+			void shouldCallChunkBuilder() {
+				downloader.processDataFromInputStream();
 
-					verify(chunkBuilder).apply(byteStringCaptor.capture());
-					assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
-				}
+				verify(chunkBuilder).apply(byteStringCaptor.capture());
+				assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
+			}
 
-				@DisplayName("should send next chunk if callObserver is ready and stream already received data")
-				@Test
-				void shouldCallOnNext() {
-					when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
+			@SneakyThrows
+			@Test
+			void shouldCallOnNext() {
+				when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
 
-					doSendChunks();
+				downloader.processDataFromInputStream();
 
-					verify(callObserver).onNext(grpcResponseDummy);
-				}
+				verify(callObserver).onNext(grpcResponseDummy);
+			}
+		}
+	}
 
-				@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
-				@Test
-				void shouldTryCompleteRequest() {
-					setDownloadFinishedField(true);
+	@Nested
+	class TestCanSendChunks {
 
-					doSendChunks();
+		@ParameterizedTest
+		@MethodSource("provideArguments")
+		void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) {
+			ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
+			lenient().when(callObserver.isReady()).thenReturn(ready);
 
-					verify(downloader).tryCompleteRequest();
-				}
-			}
+			var canSendChunks = downloader.canSendChunks();
+
+			assertThat(canSendChunks).isEqualTo(expected);
 		}
 
-		@SneakyThrows
-		private void doSendChunks() {
-			downloader.doSendChunks();
+		private static Stream<Arguments> provideArguments() {
+			return Stream.of(
+				Arguments.of(false,  false, false),
+				Arguments.of(false, true, true),
+				Arguments.of(true, false, false),
+				Arguments.of(true, true, false)
+			);
 		}
 	}
 
 	@Nested
-	class TestTryCompleteRequest {
+	class TestCompleteRequest {
 
 		@Nested
 		class OnError {
@@ -472,56 +519,32 @@ class GrpcBinaryFileServerDownloaderTest {
 
 			@Test
 			void shouldThrowException() {
-				assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception);
+				assertThatThrownBy(downloader::completeRequest).isSameAs(exception);
 			}
 		}
 
 		@Nested
-		class OnDownloadFinished {
+		class OnNoError {
 
 			@BeforeEach
 			void init() {
-				setDownloadFinishedField(true);
 				doNothing().when(downloader).completeRequestNormally();
 			}
 
 			@Test
 			void shouldNotCompleteRequestWithError() {
-				downloader.tryCompleteRequest();
+				downloader.completeRequest();
 
 				verify(downloader, never()).completeRequestWithError(any());
 			}
 
 			@Test
 			void shouldCompleteRequestNormally() {
-				downloader.tryCompleteRequest();
+				downloader.completeRequest();
 
 				verify(downloader).completeRequestNormally();
 			}
 		}
-
-		@Nested
-		class OnDownloadNotFinished {
-
-			@BeforeEach
-			void init() {
-				setDownloadFinishedField(false);
-			}
-
-			@Test
-			void shouldNotCompleteRequestNormally() {
-				downloader.tryCompleteRequest();
-
-				verify(downloader, never()).completeRequestNormally();
-			}
-
-			@Test
-			void shouldNotCompleteRequestWithError() {
-				downloader.tryCompleteRequest();
-
-				verify(downloader, never()).completeRequestWithError(any());
-			}
-		}
 	}
 
 	@Nested
@@ -622,4 +645,62 @@ class GrpcBinaryFileServerDownloaderTest {
 
 	private static class GrpcResponseDummy {
 	}
+
+	@Nested
+	class TestStreams {
+
+		private static final int CHUNK_SIZE = 255 * 1024;
+
+		private PipedInputStream inputStream;
+		private PipedOutputStream outputStream;
+
+		@SneakyThrows
+		@BeforeEach
+		void init() {
+			outputStream = new PipedOutputStream();
+			inputStream = new PipedInputStream(CHUNK_SIZE);
+			outputStream.connect(inputStream);
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldReadIncompleteFile() {
+			var fileBuffer = new byte[CHUNK_SIZE];
+			var readBuffer = new byte[CHUNK_SIZE];
+			try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
+				fileInputStream.read(fileBuffer, 0, 255);
+				outputStream.write(fileBuffer, 0, 1);
+
+				var read = inputStream.read(readBuffer, 0, CHUNK_SIZE);
+
+				assertThat(read).isEqualTo(1);
+			}
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldReadAfterOutputStreamWasClosed() {
+			var fileBuffer = new byte[CHUNK_SIZE];
+			var readBuffer = new byte[CHUNK_SIZE * 2];
+			try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
+				fileInputStream.read(fileBuffer, 0, fileBuffer.length);
+				outputStream.write(fileBuffer);
+				outputStream.close();
+
+				var read = inputStream.read(readBuffer);
+
+				assertThat(read).isEqualTo(CHUNK_SIZE);
+
+				read = inputStream.read(readBuffer);
+				assertThat(read).isEqualTo(-1);
+			}
+		}
+
+		@SneakyThrows
+		@AfterEach
+		void cleanup() {
+			outputStream.close();
+			inputStream.close();
+		}
+	}
 }
\ No newline at end of file
-- 
GitLab


From cf0635063f18e2d6a73924291df2709a7fb1e4ad Mon Sep 17 00:00:00 2001
From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com>
Date: Fri, 7 Feb 2025 15:49:37 +0100
Subject: [PATCH 2/3] OZG-7262 OZG-7680 Remove unneeded code

---
 .../GrpcBinaryFileServerDownloaderTest.java   | 70 +------------------
 1 file changed, 1 insertion(+), 69 deletions(-)

diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
index e2f860e..ba9b67a 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
@@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -40,7 +39,6 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
@@ -486,7 +484,7 @@ class GrpcBinaryFileServerDownloaderTest {
 		@ParameterizedTest
 		@MethodSource("provideArguments")
 		void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) {
-			ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
+			setRequestFinishedField(requestFinished);
 			lenient().when(callObserver.isReady()).thenReturn(ready);
 
 			var canSendChunks = downloader.canSendChunks();
@@ -635,72 +633,6 @@ class GrpcBinaryFileServerDownloaderTest {
 		return (TechnicalException) ReflectionTestUtils.getField(downloader, "downloadError", AtomicReference.class).get();
 	}
 
-	private void setDownloadFinishedField(boolean downloadFinished) {
-		ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
-	}
-
-	private boolean getDownloadFinished() {
-		return ReflectionTestUtils.getField(downloader, "downloadFinished", AtomicBoolean.class).get();
-	}
-
 	private static class GrpcResponseDummy {
 	}
-
-	@Nested
-	class TestStreams {
-
-		private static final int CHUNK_SIZE = 255 * 1024;
-
-		private PipedInputStream inputStream;
-		private PipedOutputStream outputStream;
-
-		@SneakyThrows
-		@BeforeEach
-		void init() {
-			outputStream = new PipedOutputStream();
-			inputStream = new PipedInputStream(CHUNK_SIZE);
-			outputStream.connect(inputStream);
-		}
-
-		@SneakyThrows
-		@Test
-		void shouldReadIncompleteFile() {
-			var fileBuffer = new byte[CHUNK_SIZE];
-			var readBuffer = new byte[CHUNK_SIZE];
-			try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
-				fileInputStream.read(fileBuffer, 0, 255);
-				outputStream.write(fileBuffer, 0, 1);
-
-				var read = inputStream.read(readBuffer, 0, CHUNK_SIZE);
-
-				assertThat(read).isEqualTo(1);
-			}
-		}
-
-		@SneakyThrows
-		@Test
-		void shouldReadAfterOutputStreamWasClosed() {
-			var fileBuffer = new byte[CHUNK_SIZE];
-			var readBuffer = new byte[CHUNK_SIZE * 2];
-			try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
-				fileInputStream.read(fileBuffer, 0, fileBuffer.length);
-				outputStream.write(fileBuffer);
-				outputStream.close();
-
-				var read = inputStream.read(readBuffer);
-
-				assertThat(read).isEqualTo(CHUNK_SIZE);
-
-				read = inputStream.read(readBuffer);
-				assertThat(read).isEqualTo(-1);
-			}
-		}
-
-		@SneakyThrows
-		@AfterEach
-		void cleanup() {
-			outputStream.close();
-			inputStream.close();
-		}
-	}
 }
\ No newline at end of file
-- 
GitLab


From c10e6482acea52f680e3db96fd9b554e0266db4a Mon Sep 17 00:00:00 2001
From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com>
Date: Mon, 10 Feb 2025 10:37:28 +0100
Subject: [PATCH 3/3] OZG-7262 OZG-7680 Renaming methods

---
 .../GrpcBinaryFileServerDownloader.java       |  31 ++--
 .../GrpcBinaryFileServerDownloaderITCase.java |   2 +-
 .../GrpcBinaryFileServerDownloaderTest.java   | 144 +++++++++---------
 3 files changed, 90 insertions(+), 87 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
index 7326002..9a41471 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
@@ -48,8 +48,8 @@ public class GrpcBinaryFileServerDownloader<T> {
 
 	static final int CHUNK_SIZE = 255 * 1024;
 
-	private static final int END_OF_STREAM = -1;
-	private static final int NOTHING_READ = 0;
+	static final int END_OF_STREAM = -1;
+	static final int NOTHING_READ = 0;
 
 	private final CallStreamObserver<T> callObserver;
 	private final Function<ByteString, T> chunkBuilder;
@@ -126,7 +126,7 @@ public class GrpcBinaryFileServerDownloader<T> {
 		try {
 			doSendChunks();
 		} catch (Exception e) {
-			completeRequestWithError(new TechnicalException("Error while sending chunks", e));
+			handleError(new TechnicalException("Error while sending chunks", e));
 		}
 	}
 
@@ -144,36 +144,37 @@ public class GrpcBinaryFileServerDownloader<T> {
 		var bytesRead = inputStream.read(buffer);
 		switch (bytesRead) {
 			case END_OF_STREAM:
-				completeRequest();
+				finishProcessing();
 				break;
 			case NOTHING_READ:
 				break;
 			default:
-				callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
-				LOG.debug("Sent {} bytes", bytesRead);
+				sendBytesToCallObserver(bytesRead);
 		}
 	}
 
-	void completeRequest() {
+	void sendBytesToCallObserver(int bytesRead) {
+		var bytes = ByteString.copyFrom(buffer, 0, bytesRead);
+		var chunk = chunkBuilder.apply(bytes);
+		callObserver.onNext(chunk);
+		LOG.debug("Sent {} bytes", bytesRead);
+	}
+
+	void finishProcessing() {
 		if (Objects.nonNull(downloadError.get())) {
 			throw downloadError.get();
 		} else {
-			completeRequestNormally();
+			finishRequest();
+			callObserver.onCompleted();
 		}
 	}
 
-	void completeRequestWithError(TechnicalException e) {
+	void handleError(TechnicalException e) {
 		LOG.debug("Complete download request with error");
 		finishRequest();
 		throw e;
 	}
 
-	void completeRequestNormally() {
-		LOG.debug("Complete download request");
-		finishRequest();
-		callObserver.onCompleted();
-	}
-
 	private void finishRequest() {
 		requestFinished.set(true);
 		closeInputStream();
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
index 039f873..fed3cc3 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
@@ -65,7 +65,7 @@ class GrpcBinaryFileServerDownloaderITCase {
 	@Nested
 	class OnNoError {
 
-		private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue();
+		private static final int DOWNLOAD_DATA_LENGTH = (int) (GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5);
 		@Captor
 		private ArgumentCaptor<GrpcResponseDummy> captor;
 
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
index ba9b67a..1817466 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
@@ -23,6 +23,7 @@
  */
 package de.ozgcloud.common.binaryfile;
 
+import static de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader.*;
 import static org.assertj.core.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
@@ -337,14 +338,14 @@ class GrpcBinaryFileServerDownloaderTest {
 			@BeforeEach
 			void init() {
 				doThrow(exception).when(downloader).doSendChunks();
-				doNothing().when(downloader).completeRequestWithError(any());
+				doNothing().when(downloader).handleError(any());
 			}
 
 			@Test
-			void shouldCompleteRequestWithError() {
+			void shouldHandleError() {
 				downloader.sendChunks();
 
-				verify(downloader).completeRequestWithError(argumentCaptor.capture());
+				verify(downloader).handleError(argumentCaptor.capture());
 				assertThat(argumentCaptor.getValue()).isInstanceOf(TechnicalException.class).hasCause(exception);
 			}
 		}
@@ -397,25 +398,25 @@ class GrpcBinaryFileServerDownloaderTest {
 			@SneakyThrows
 			@BeforeEach
 			void init() {
-				doNothing().when(downloader).completeRequest();
-				when(inputStream.read(any())).thenReturn(-1);
+				doNothing().when(downloader).finishProcessing();
+				when(inputStream.read(any())).thenReturn(END_OF_STREAM);
 				setInputStreamField(inputStream);
 			}
 
 			@SneakyThrows
 			@Test
-			void shouldCompleteRequest() {
+			void shouldFinishProcessing() {
 				downloader.processDataFromInputStream();
 
-				verify(downloader).completeRequest();
+				verify(downloader).finishProcessing();
 			}
 
 			@SneakyThrows
 			@Test
-			void shouldNotCallCallObserver() {
+			void shouldNotSendBytesToCallObserver() {
 				downloader.processDataFromInputStream();
 
-				verifyNoInteractions(callObserver);
+				verify(downloader, never()).sendBytesToCallObserver(anyInt());
 			}
 		}
 
@@ -425,56 +426,73 @@ class GrpcBinaryFileServerDownloaderTest {
 			@SneakyThrows
 			@BeforeEach
 			void init() {
-				when(inputStream.read(any())).thenReturn(0);
+				when(inputStream.read(any())).thenReturn(NOTHING_READ);
 				setInputStreamField(inputStream);
 			}
 
 			@SneakyThrows
 			@Test
-			void shouldNotCallCallObserver() {
+			void shouldNotSendBytesToCallObserver() {
 				downloader.processDataFromInputStream();
 
-				verifyNoInteractions(callObserver);
+				verify(downloader, never()).sendBytesToCallObserver(anyInt());
 			}
 		}
 
 		@Nested
 		class OnBytesWereReceived {
 
-			@Captor
-			private ArgumentCaptor<ByteString> byteStringCaptor;
-
-			private final int readBytes = 20;
-			private final byte[] buffer = new byte[readBytes];
-			private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
+			private final int bytesRead = 20;
 
 			@SneakyThrows
 			@BeforeEach
 			void mock() {
-				when(inputStream.read(any())).thenReturn(readBytes);
+				when(inputStream.read(any())).thenReturn(bytesRead);
 				setInputStreamField(inputStream);
-				new Random().nextBytes(buffer);
-				ReflectionTestUtils.setField(downloader, "buffer", buffer);
 			}
 
 			@SneakyThrows
 			@Test
-			void shouldCallChunkBuilder() {
+			void shouldSendBytesToCallObserver() {
 				downloader.processDataFromInputStream();
 
-				verify(chunkBuilder).apply(byteStringCaptor.capture());
-				assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
+				verify(downloader).sendBytesToCallObserver(bytesRead);
 			}
+		}
+	}
 
-			@SneakyThrows
-			@Test
-			void shouldCallOnNext() {
-				when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
+	@Nested
+	class TestSendBytesToCallObserver {
 
-				downloader.processDataFromInputStream();
+		@Captor
+		private ArgumentCaptor<ByteString> byteStringCaptor;
+		private final int bytesRead = 20;
+		private final byte[] buffer = new byte[bytesRead];
+		private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
 
-				verify(callObserver).onNext(grpcResponseDummy);
-			}
+		@BeforeEach
+		void init() {
+			new Random().nextBytes(buffer);
+			ReflectionTestUtils.setField(downloader, "buffer", buffer);
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldCallChunkBuilder() {
+			downloader.sendBytesToCallObserver(bytesRead);
+
+			verify(chunkBuilder).apply(byteStringCaptor.capture());
+			assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldCallOnNext() {
+			when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
+
+			downloader.sendBytesToCallObserver(bytesRead);
+
+			verify(callObserver).onNext(grpcResponseDummy);
 		}
 	}
 
@@ -503,7 +521,7 @@ class GrpcBinaryFileServerDownloaderTest {
 	}
 
 	@Nested
-	class TestCompleteRequest {
+	class TestFinishProcessing {
 
 		@Nested
 		class OnError {
@@ -517,7 +535,7 @@ class GrpcBinaryFileServerDownloaderTest {
 
 			@Test
 			void shouldThrowException() {
-				assertThatThrownBy(downloader::completeRequest).isSameAs(exception);
+				assertThatThrownBy(downloader::finishProcessing).isSameAs(exception);
 			}
 		}
 
@@ -526,59 +544,43 @@ class GrpcBinaryFileServerDownloaderTest {
 
 			@BeforeEach
 			void init() {
-				doNothing().when(downloader).completeRequestNormally();
+				doNothing().when(downloader).closeInputStream();
 			}
 
 			@Test
 			void shouldNotCompleteRequestWithError() {
-				downloader.completeRequest();
+				downloader.finishProcessing();
 
-				verify(downloader, never()).completeRequestWithError(any());
+				verify(downloader, never()).handleError(any());
 			}
 
 			@Test
-			void shouldCompleteRequestNormally() {
-				downloader.completeRequest();
+			void shouldSetRequestFinished() {
+				assertThat(getRequestFinished()).isFalse();
 
-				verify(downloader).completeRequestNormally();
-			}
-		}
-	}
+				downloader.finishProcessing();
 
-	@Nested
-	class TestCompleteRequestNormally {
-
-		@BeforeEach
-		void init() {
-			doNothing().when(downloader).closeInputStream();
-		}
-
-		@Test
-		void shouldSetRequestFinished() {
-			assertThat(getRequestFinished()).isFalse();
-
-			downloader.completeRequestNormally();
-
-			assertThat(getRequestFinished()).isTrue();
-		}
+				assertThat(getRequestFinished()).isTrue();
+			}
 
-		@Test
-		void shouldCloseInputStream() {
-			downloader.completeRequestNormally();
+			@Test
+			void shouldCloseInputStream() {
+				downloader.finishProcessing();
 
-			verify(downloader).closeInputStream();
-		}
+				verify(downloader).closeInputStream();
+			}
 
-		@Test
-		void shouldNotifyObserver() {
-			downloader.completeRequestNormally();
+			@Test
+			void shouldNotifyObserver() {
+				downloader.finishProcessing();
 
-			verify(callObserver).onCompleted();
+				verify(callObserver).onCompleted();
+			}
 		}
 	}
 
 	@Nested
-	class TestCompleteRequestWithError {
+	class TestHandleError {
 
 		private final TechnicalException error = new TechnicalException("error");
 
@@ -591,21 +593,21 @@ class GrpcBinaryFileServerDownloaderTest {
 		void shouldSetRequestFinished() {
 			assertThat(getRequestFinished()).isFalse();
 
-			catchException(() -> downloader.completeRequestWithError(error));
+			catchException(() -> downloader.handleError(error));
 
 			assertThat(getRequestFinished()).isTrue();
 		}
 
 		@Test
 		void shouldCloseInputStream() {
-			catchException(() -> downloader.completeRequestWithError(error));
+			catchException(() -> downloader.handleError(error));
 
 			verify(downloader).closeInputStream();
 		}
 
 		@Test
 		void shouldThrowException() {
-			assertThatThrownBy(() -> downloader.completeRequestWithError(error)).isSameAs(error);
+			assertThatThrownBy(() -> downloader.handleError(error)).isSameAs(error);
 		}
 	}
 
-- 
GitLab