From 133aa19b29b5464ade61fe480a63de94e050665f Mon Sep 17 00:00:00 2001
From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com>
Date: Fri, 7 Feb 2025 15:40:39 +0100
Subject: [PATCH] OZG-7262 OZG-7680 Fix onReadyHandler not finishing

---
 .../GrpcBinaryFileServerDownloader.java       |  42 +--
 .../GrpcBinaryFileServerDownloaderITCase.java | 185 +++++++++++
 .../GrpcBinaryFileServerDownloaderTest.java   | 287 +++++++++++-------
 3 files changed, 393 insertions(+), 121 deletions(-)
 create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
index 07379d8..7326002 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
@@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2;
 @Log4j2
 public class GrpcBinaryFileServerDownloader<T> {
 
-	private static final int CHUNK_SIZE = 255 * 1024;
+	static final int CHUNK_SIZE = 255 * 1024;
+
+	private static final int END_OF_STREAM = -1;
+	private static final int NOTHING_READ = 0;
 
 	private final CallStreamObserver<T> callObserver;
 	private final Function<ByteString, T> chunkBuilder;
@@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> {
 
 	private final byte[] buffer = new byte[CHUNK_SIZE];
 	private final AtomicBoolean started = new AtomicBoolean(false);
-	private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
 	private final AtomicBoolean requestFinished = new AtomicBoolean(false);
 	private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>();
 
@@ -118,7 +120,6 @@ public class GrpcBinaryFileServerDownloader<T> {
 		LOG.debug("Downloading file content...");
 		downloadConsumer.accept(outputStream);
 		LOG.debug("Download completed.");
-		downloadFinished.set(true);
 	}
 
 	synchronized void sendChunks() {
@@ -130,28 +131,33 @@ public class GrpcBinaryFileServerDownloader<T> {
 	}
 
 	void doSendChunks() throws IOException {
-		if (requestFinished.get()) {
-			return;
-		}
-		int bytesRead;
-		while (isReady()) {
-			if ((bytesRead = inputStream.read(buffer)) == -1) {
-				tryCompleteRequest();
-				break;
-			}
-			callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
-			LOG.debug("Sent {} bytes", bytesRead);
+		while (canSendChunks()) {
+			processDataFromInputStream();
 		}
 	}
 
-	private boolean isReady() {
-		return callObserver.isReady();
+	boolean canSendChunks() {
+		return !requestFinished.get() && callObserver.isReady();
+	}
+
+	void processDataFromInputStream() throws IOException {
+		var bytesRead = inputStream.read(buffer);
+		switch (bytesRead) {
+			case END_OF_STREAM:
+				completeRequest();
+				break;
+			case NOTHING_READ:
+				break;
+			default:
+				callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
+				LOG.debug("Sent {} bytes", bytesRead);
+		}
 	}
 
-	void tryCompleteRequest() {
+	void completeRequest() {
 		if (Objects.nonNull(downloadError.get())) {
 			throw downloadError.get();
-		} else if (downloadFinished.get()) {
+		} else {
 			completeRequestNormally();
 		}
 	}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
new file mode 100644
index 0000000..039f873
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+
+import com.google.protobuf.ByteString;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import lombok.SneakyThrows;
+
+class GrpcBinaryFileServerDownloaderITCase {
+
+	@Mock
+	private CallStreamObserver<GrpcResponseDummy> callObserver;
+	private SimpleAsyncTaskExecutor taskExecutor;
+	private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader;
+
+	@BeforeEach
+	void init() {
+		taskExecutor = new SimpleAsyncTaskExecutor();
+		when(callObserver.isReady()).thenReturn(true);
+	}
+
+	@AfterEach
+	void cleanup() {
+		taskExecutor.close();
+	}
+
+	@Nested
+	class OnNoError {
+
+		private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue();
+		@Captor
+		private ArgumentCaptor<GrpcResponseDummy> captor;
+
+		@BeforeEach
+		void init() {
+			downloader = spy(downloaderBuilder()
+					.downloadConsumer(this::downloadData)
+					.build());
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldCallOnCompleted() {
+			start();
+
+			verify(callObserver).onCompleted();
+		}
+
+		@Test
+		void shouldReadAllData() {
+			start();
+
+			verify(callObserver, times(2)).onNext(captor.capture());
+			var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum();
+			assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			start();
+
+			verify(downloader).closeInputStream();
+			verify(downloader).closeOutputStream();
+		}
+
+		@Test
+		void shouldCompleteIfDownloadConsumerClosedOutputStream() {
+			downloader = spy(downloaderBuilder()
+					.downloadConsumer(this::downloadDataAndCloseStream)
+					.build());
+
+			start();
+
+			verify(callObserver).onCompleted();
+		}
+
+		@SneakyThrows
+		private void downloadData(OutputStream outputStream) {
+			byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH];
+			Arrays.fill(bytes, (byte) 1);
+			outputStream.write(bytes);
+		}
+
+		@SneakyThrows
+		private void downloadDataAndCloseStream(OutputStream outputStream) {
+			downloadData(outputStream);
+			outputStream.close();
+			Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns
+		}
+	}
+
+	@Nested
+	class OnError {
+
+		private final Throwable error = new TechnicalException("error");
+
+		@BeforeEach
+		void init() {
+			downloader = spy(downloaderBuilder()
+					.downloadConsumer(this::downloadData)
+					.build());
+		}
+
+		@Test
+		void shouldThrowException() {
+			assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class);
+		}
+
+		@Test
+		void shouldNotCallOnCompleted() {
+			catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
+
+			verify(callObserver, never()).onCompleted();
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
+
+			verify(downloader).closeInputStream();
+			verify(downloader).closeOutputStream();
+		}
+
+		@SneakyThrows
+		private void downloadData(OutputStream outputStream) {
+			throw error;
+		}
+	}
+
+	private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() {
+		return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder()
+				.taskExecutor(taskExecutor)
+				.callObserver(callObserver)
+				.chunkBuilder(this::buildChunk);
+	}
+
+	private GrpcResponseDummy buildChunk(ByteString data) {
+		return new GrpcResponseDummy(data.size());
+	}
+
+	private void start() {
+		downloader.start();
+		downloader.sendChunks();
+	}
+
+	private record GrpcResponseDummy(int bytesRead) {}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
index f2f2ef4..e2f860e 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
@@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -37,11 +38,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
@@ -300,18 +305,6 @@ class GrpcBinaryFileServerDownloaderTest {
 
 			verify(downloadConsumer).accept(outputStream);
 		}
-
-		@Test
-		void shouldDownloadFinishedBeInitiallyFalse() {
-			assertThat(getDownloadFinished()).isFalse();
-		}
-
-		@Test
-		void shouldSetDownloadFinished() {
-			downloader.doDownload();
-
-			assertThat(getDownloadFinished()).isTrue();
-		}
 	}
 
 	@Nested
@@ -362,103 +355,157 @@ class GrpcBinaryFileServerDownloaderTest {
 	@Nested
 	class TestDoSendChunks {
 
+		@SneakyThrows
+		@Test
+		void shouldCheckIfCanSendChunks() {
+			doReturn(false).when(downloader).canSendChunks();
+
+			downloader.doSendChunks();
+
+			verify(downloader).canSendChunks();
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldNotProcessIfCannotSendChunks() {
+			doReturn(false).when(downloader).canSendChunks();
+
+			downloader.doSendChunks();
+
+			verify(downloader, never()).processDataFromInputStream();
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldProcessAsLongAsCanSendChunks() {
+			doReturn(true, true, false).when(downloader).canSendChunks();
+			doNothing().when(downloader).processDataFromInputStream();
+
+			downloader.doSendChunks();
+
+			verify(downloader, times(2)).processDataFromInputStream();
+		}
+	}
+
+	@Nested
+	class TestProcessDataFromInputStream {
+
+		@Mock
+		private PipedInputStream inputStream;
+
 		@Nested
-		class OnRequestFinished {
+		class OnEndOfStreamReached {
 
+			@SneakyThrows
 			@BeforeEach
 			void init() {
-				setRequestFinishedField(true);
+				doNothing().when(downloader).completeRequest();
+				when(inputStream.read(any())).thenReturn(-1);
+				setInputStreamField(inputStream);
 			}
 
+			@SneakyThrows
 			@Test
-			void shouldNotInteractWithCallObserver() {
-				doSendChunks();
+			void shouldCompleteRequest() {
+				downloader.processDataFromInputStream();
+
+				verify(downloader).completeRequest();
+			}
+
+			@SneakyThrows
+			@Test
+			void shouldNotCallCallObserver() {
+				downloader.processDataFromInputStream();
 
 				verifyNoInteractions(callObserver);
 			}
 		}
 
 		@Nested
-		class OnRequestNotFinished {
+		class OnNoBytesWereReceived {
 
-			@Nested
-			class OnNotReady {
-
-				@BeforeEach
-				void init() {
-					when(callObserver.isReady()).thenReturn(false);
-				}
+			@SneakyThrows
+			@BeforeEach
+			void init() {
+				when(inputStream.read(any())).thenReturn(0);
+				setInputStreamField(inputStream);
+			}
 
-				@Test
-				void shouldOnlyCallIsReadyOnObserver() {
-					doSendChunks();
+			@SneakyThrows
+			@Test
+			void shouldNotCallCallObserver() {
+				downloader.processDataFromInputStream();
 
-					verify(callObserver).isReady();
-					verifyNoMoreInteractions(callObserver);
-				}
+				verifyNoInteractions(callObserver);
 			}
+		}
 
-			@Nested
-			class OnReady {
+		@Nested
+		class OnBytesWereReceived {
 
-				@Mock
-				private PipedInputStream inputStream;
-				@Captor
-				private ArgumentCaptor<ByteString> byteStringCaptor;
+			@Captor
+			private ArgumentCaptor<ByteString> byteStringCaptor;
 
-				private final int readBytes = 20;
-				private final byte[] buffer = new byte[readBytes];
-				private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
+			private final int readBytes = 20;
+			private final byte[] buffer = new byte[readBytes];
+			private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
 
-				@SneakyThrows
-				@BeforeEach
-				void mock() {
-					doNothing().when(downloader).tryCompleteRequest();
-					when(callObserver.isReady()).thenReturn(true);
-					when(inputStream.read(any())).thenReturn(readBytes, -1);
-					setInputStreamField(inputStream);
-					new Random().nextBytes(buffer);
-					ReflectionTestUtils.setField(downloader, "buffer", buffer);
-				}
+			@SneakyThrows
+			@BeforeEach
+			void mock() {
+				when(inputStream.read(any())).thenReturn(readBytes);
+				setInputStreamField(inputStream);
+				new Random().nextBytes(buffer);
+				ReflectionTestUtils.setField(downloader, "buffer", buffer);
+			}
 
-				@Test
-				void shouldCallChunkBuilder() {
-					doSendChunks();
+			@SneakyThrows
+			@Test
+			void shouldCallChunkBuilder() {
+				downloader.processDataFromInputStream();
 
-					verify(chunkBuilder).apply(byteStringCaptor.capture());
-					assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
-				}
+				verify(chunkBuilder).apply(byteStringCaptor.capture());
+				assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
+			}
 
-				@DisplayName("should send next chunk if callObserver is ready and stream already received data")
-				@Test
-				void shouldCallOnNext() {
-					when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
+			@SneakyThrows
+			@Test
+			void shouldCallOnNext() {
+				when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
 
-					doSendChunks();
+				downloader.processDataFromInputStream();
 
-					verify(callObserver).onNext(grpcResponseDummy);
-				}
+				verify(callObserver).onNext(grpcResponseDummy);
+			}
+		}
+	}
 
-				@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
-				@Test
-				void shouldTryCompleteRequest() {
-					setDownloadFinishedField(true);
+	@Nested
+	class TestCanSendChunks {
 
-					doSendChunks();
+		@ParameterizedTest
+		@MethodSource("provideArguments")
+		void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) {
+			ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
+			lenient().when(callObserver.isReady()).thenReturn(ready);
 
-					verify(downloader).tryCompleteRequest();
-				}
-			}
+			var canSendChunks = downloader.canSendChunks();
+
+			assertThat(canSendChunks).isEqualTo(expected);
 		}
 
-		@SneakyThrows
-		private void doSendChunks() {
-			downloader.doSendChunks();
+		private static Stream<Arguments> provideArguments() {
+			return Stream.of(
+				Arguments.of(false,  false, false),
+				Arguments.of(false, true, true),
+				Arguments.of(true, false, false),
+				Arguments.of(true, true, false)
+			);
 		}
 	}
 
 	@Nested
-	class TestTryCompleteRequest {
+	class TestCompleteRequest {
 
 		@Nested
 		class OnError {
@@ -472,56 +519,32 @@ class GrpcBinaryFileServerDownloaderTest {
 
 			@Test
 			void shouldThrowException() {
-				assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception);
+				assertThatThrownBy(downloader::completeRequest).isSameAs(exception);
 			}
 		}
 
 		@Nested
-		class OnDownloadFinished {
+		class OnNoError {
 
 			@BeforeEach
 			void init() {
-				setDownloadFinishedField(true);
 				doNothing().when(downloader).completeRequestNormally();
 			}
 
 			@Test
 			void shouldNotCompleteRequestWithError() {
-				downloader.tryCompleteRequest();
+				downloader.completeRequest();
 
 				verify(downloader, never()).completeRequestWithError(any());
 			}
 
 			@Test
 			void shouldCompleteRequestNormally() {
-				downloader.tryCompleteRequest();
+				downloader.completeRequest();
 
 				verify(downloader).completeRequestNormally();
 			}
 		}
-
-		@Nested
-		class OnDownloadNotFinished {
-
-			@BeforeEach
-			void init() {
-				setDownloadFinishedField(false);
-			}
-
-			@Test
-			void shouldNotCompleteRequestNormally() {
-				downloader.tryCompleteRequest();
-
-				verify(downloader, never()).completeRequestNormally();
-			}
-
-			@Test
-			void shouldNotCompleteRequestWithError() {
-				downloader.tryCompleteRequest();
-
-				verify(downloader, never()).completeRequestWithError(any());
-			}
-		}
 	}
 
 	@Nested
@@ -622,4 +645,62 @@ class GrpcBinaryFileServerDownloaderTest {
 
 	private static class GrpcResponseDummy {
 	}
+
+	@Nested
+	class TestStreams {
+
+		private static final int CHUNK_SIZE = 255 * 1024;
+
+		private PipedInputStream inputStream;
+		private PipedOutputStream outputStream;
+
+		@SneakyThrows
+		@BeforeEach
+		void init() {
+			outputStream = new PipedOutputStream();
+			inputStream = new PipedInputStream(CHUNK_SIZE);
+			outputStream.connect(inputStream);
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldReadIncompleteFile() {
+			var fileBuffer = new byte[CHUNK_SIZE];
+			var readBuffer = new byte[CHUNK_SIZE];
+			try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
+				fileInputStream.read(fileBuffer, 0, 255);
+				outputStream.write(fileBuffer, 0, 1);
+
+				var read = inputStream.read(readBuffer, 0, CHUNK_SIZE);
+
+				assertThat(read).isEqualTo(1);
+			}
+		}
+
+		@SneakyThrows
+		@Test
+		void shouldReadAfterOutputStreamWasClosed() {
+			var fileBuffer = new byte[CHUNK_SIZE];
+			var readBuffer = new byte[CHUNK_SIZE * 2];
+			try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
+				fileInputStream.read(fileBuffer, 0, fileBuffer.length);
+				outputStream.write(fileBuffer);
+				outputStream.close();
+
+				var read = inputStream.read(readBuffer);
+
+				assertThat(read).isEqualTo(CHUNK_SIZE);
+
+				read = inputStream.read(readBuffer);
+				assertThat(read).isEqualTo(-1);
+			}
+		}
+
+		@SneakyThrows
+		@AfterEach
+		void cleanup() {
+			outputStream.close();
+			inputStream.close();
+		}
+	}
 }
\ No newline at end of file
-- 
GitLab