From da68d4a4899bd1822aaf24c641b2c35a6603c732 Mon Sep 17 00:00:00 2001
From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 29 Jan 2025 15:55:51 +0100
Subject: [PATCH] OZG-7262 OZG-7627 Remove CallStreamObserverWrapper

---
 .../binaryfile/CallStreamObserverWrapper.java |  78 ----
 .../GrpcBinaryFileServerDownloader.java       |  55 ++-
 .../CallStreamObserverWrapperTest.java        | 254 ------------
 .../GrpcBinaryFileServerDownloaderTest.java   | 390 ++++++++++++------
 4 files changed, 303 insertions(+), 474 deletions(-)
 delete mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java
 delete mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java
deleted file mode 100644
index a0462c6..0000000
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
- * Ministerpräsidenten des Landes Schleswig-Holstein
- * Staatskanzlei
- * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
- *
- * Lizenziert unter der EUPL, Version 1.2 oder - sobald
- * diese von der Europäischen Kommission genehmigt wurden -
- * Folgeversionen der EUPL ("Lizenz");
- * Sie dürfen dieses Werk ausschließlich gemäß
- * dieser Lizenz nutzen.
- * Eine Kopie der Lizenz finden Sie hier:
- *
- * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
- *
- * Sofern nicht durch anwendbare Rechtsvorschriften
- * gefordert oder in schriftlicher Form vereinbart, wird
- * die unter der Lizenz verbreitete Software "so wie sie
- * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
- * ausdrücklich oder stillschweigend - verbreitet.
- * Die sprachspezifischen Genehmigungen und Beschränkungen
- * unter der Lizenz sind dem Lizenztext zu entnehmen.
- */
-package de.ozgcloud.common.binaryfile;
-
-import java.util.function.Supplier;
-
-import de.ozgcloud.common.errorhandling.TechnicalException;
-import io.grpc.stub.CallStreamObserver;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-	class CallStreamObserverWrapper<V> {
-
-	private final CallStreamObserver<V> callStreamObserver;
-	private boolean failed;
-
-	public void setOnReadyHandler(Runnable onReadyHandler) {
-		callStreamObserver.setOnReadyHandler(onReadyHandler);
-	}
-
-	public synchronized boolean isReady() {
-		return ifNotFailed(callStreamObserver::isReady);
-	}
-
-	public synchronized void onNext(V value) {
-		ifNotFailed(() -> callStreamObserver.onNext(value));
-	}
-
-	public synchronized void onError(Throwable t) {
-		if (!failed) {
-			callStreamObserver.onError(new BinaryFileDownloadException(t));
-			failed = true;
-		} else {
-			handleIllegalCallAfterError();
-		}
-	}
-
-	public synchronized void onCompleted() {
-		ifNotFailed(callStreamObserver::onCompleted);
-	}
-
-	private void ifNotFailed(Runnable runnable) {
-		if (!failed) {
-			runnable.run();
-		} else {
-			handleIllegalCallAfterError();
-		}
-	}
-
-	private <T> T ifNotFailed(Supplier<T> supplier) {
-		return !failed ? supplier.get() : handleIllegalCallAfterError();
-	}
-
-	private <T> T handleIllegalCallAfterError() {
-		throw new TechnicalException("CallStreamObserver called after error");
-	}
-}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
index 44f3fae..090a53d 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
@@ -28,6 +28,7 @@ import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -45,15 +46,16 @@ public class GrpcBinaryFileServerDownloader<T> {
 
 	private static final int CHUNK_SIZE = 255 * 1024;
 
-	private final CallStreamObserverWrapper<T> callObserver;
+	private final CallStreamObserver<T> callObserver;
 	private final Function<ByteString, T> chunkBuilder;
 	private final Consumer<OutputStream> downloadConsumer;
 	private final TaskExecutor taskExecutor;
 
-	private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
+	private final byte[] buffer = new byte[CHUNK_SIZE];
 	private final AtomicBoolean started = new AtomicBoolean(false);
 	private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
 	private final AtomicBoolean requestFinished = new AtomicBoolean(false);
+	private final AtomicReference<BinaryFileDownloadException> error = new AtomicReference<>();
 
 	private PipedInputStream inputStream;
 	private PipedOutputStream outputStream;
@@ -61,7 +63,7 @@ public class GrpcBinaryFileServerDownloader<T> {
 	@Builder
 	public GrpcBinaryFileServerDownloader(CallStreamObserver<T> callObserver, Function<ByteString, T> chunkBuilder,
 			Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) {
-		this.callObserver = new CallStreamObserverWrapper<>(callObserver);
+		this.callObserver = callObserver;
 		this.chunkBuilder = chunkBuilder;
 		this.downloadConsumer = downloadConsumer;
 		this.taskExecutor = taskExecutor;
@@ -88,19 +90,27 @@ public class GrpcBinaryFileServerDownloader<T> {
 		try {
 			setupStreams();
 		} catch (Exception e) {
-			closeStreams();
+			closeOutputStream();
+			closeInputStream();
 			throw new BinaryFileDownloadException(e);
 		}
 	}
 
 	void setupStreams() throws IOException {
 		outputStream = new PipedOutputStream();
-		inputStream = new PipedInputStream(GrpcBinaryFileServerDownloader.CHUNK_SIZE);
+		inputStream = new PipedInputStream(CHUNK_SIZE);
 		outputStream.connect(inputStream);
 	}
 
 	void startDownload() {
-		withDownloadErrorHandling(this::doDownload);
+		try {
+			doDownload();
+		} catch (Exception e) {
+			error.set(new BinaryFileDownloadException(e));
+		} finally {
+			closeOutputStream();
+			sendChunks();
+		}
 	}
 
 	void doDownload() {
@@ -108,12 +118,14 @@ public class GrpcBinaryFileServerDownloader<T> {
 		downloadConsumer.accept(outputStream);
 		LOG.debug("Download completed.");
 		downloadFinished.set(true);
-		closeOutputStream();
-		sendChunks();
 	}
 
 	synchronized void sendChunks() {
-		withDownloadErrorHandling(this::doSendChunks);
+		try {
+			doSendChunks();
+		} catch (Exception e) {
+			completeRequestWithError(e);
+		}
 	}
 
 	void doSendChunks() throws IOException {
@@ -121,7 +133,11 @@ public class GrpcBinaryFileServerDownloader<T> {
 			return;
 		}
 		int bytesRead;
-		while (callObserver.isReady()) {
+		while (isReady()) {
+			if (error.get() != null) {
+				completeRequestWithError(error.get());
+				break;
+			}
 			if ((bytesRead = inputStream.read(buffer)) == -1) {
 				tryCompleteRequest();
 				break;
@@ -131,6 +147,10 @@ public class GrpcBinaryFileServerDownloader<T> {
 		}
 	}
 
+	private boolean isReady() {
+		return callObserver.isReady();
+	}
+
 	void tryCompleteRequest() {
 		if (shouldCompleteRequest()) {
 			completeRequest();
@@ -147,18 +167,11 @@ public class GrpcBinaryFileServerDownloader<T> {
 		callObserver.onCompleted();
 	}
 
-	void withDownloadErrorHandling(ExceptionalRunnable runnable) {
-		try {
-			runnable.run();
-		} catch (Exception e) {
-			closeStreams();
-			callObserver.onError(e);
-		}
-	}
-
-	private void closeStreams() {
-		closeOutputStream();
+	void completeRequestWithError(Throwable t) {
+		LOG.debug("Complete download request with error", t);
+		requestFinished.set(true);
 		closeInputStream();
+		callObserver.onError(t);
 	}
 
 	void closeOutputStream() {
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java
deleted file mode 100644
index c1d5999..0000000
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
- * Ministerpräsidenten des Landes Schleswig-Holstein
- * Staatskanzlei
- * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
- *
- * Lizenziert unter der EUPL, Version 1.2 oder - sobald
- * diese von der Europäischen Kommission genehmigt wurden -
- * Folgeversionen der EUPL ("Lizenz");
- * Sie dürfen dieses Werk ausschließlich gemäß
- * dieser Lizenz nutzen.
- * Eine Kopie der Lizenz finden Sie hier:
- *
- * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
- *
- * Sofern nicht durch anwendbare Rechtsvorschriften
- * gefordert oder in schriftlicher Form vereinbart, wird
- * die unter der Lizenz verbreitete Software "so wie sie
- * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
- * ausdrücklich oder stillschweigend - verbreitet.
- * Die sprachspezifischen Genehmigungen und Beschränkungen
- * unter der Lizenz sind dem Lizenztext zu entnehmen.
- */
-package de.ozgcloud.common.binaryfile;
-
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.*;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-
-import de.ozgcloud.common.errorhandling.TechnicalException;
-import de.ozgcloud.common.test.ReflectionTestUtils;
-import io.grpc.stub.CallStreamObserver;
-
-class CallStreamObserverWrapperTest {
-
-	@Mock
-	private CallStreamObserver<GrpcResponseDummy> callObserver;
-	@InjectMocks
-	private CallStreamObserverWrapper<GrpcResponseDummy> wrapper;
-
-	@Nested
-	class TestSetOnReadyHandler {
-
-		@Test
-		void shouldForwardToObserver() {
-			var onReadyHandler = mock(Runnable.class);
-
-			wrapper.setOnReadyHandler(onReadyHandler);
-
-			verify(callObserver).setOnReadyHandler(onReadyHandler);
-		}
-	}
-
-	@Nested
-	class TestIsReady {
-
-		@Nested
-		class OnFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", true);
-			}
-
-			@Test
-			void shouldNotCallObserver() {
-				catchThrowable(wrapper::isReady);
-
-				verifyNoInteractions(callObserver);
-			}
-
-			@Test
-			void shouldThrowException() {
-				assertThatThrownBy(wrapper::isReady).isInstanceOf(TechnicalException.class);
-			}
-		}
-
-		@Nested
-		class OnNotFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", false);
-			}
-
-			@Test
-			void shouldForwardToObserver() {
-				wrapper.isReady();
-
-				verify(callObserver).isReady();
-			}
-
-			@ParameterizedTest
-			@ValueSource(booleans = { true, false })
-			void shouldReturnObserverResult(boolean observerResult) {
-				when(callObserver.isReady()).thenReturn(observerResult);
-
-				var ready = wrapper.isReady();
-
-				assertThat(ready).isEqualTo(observerResult);
-			}
-		}
-	}
-
-	@Nested
-	class TestOnNext {
-
-		private final GrpcResponseDummy grpccResponse = new GrpcResponseDummy();
-
-		@Nested
-		class OnFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", true);
-			}
-
-			@Test
-			void shouldNotCallObserver() {
-				catchThrowable(() -> wrapper.onNext(grpccResponse));
-
-				verifyNoInteractions(callObserver);
-			}
-
-			@Test
-			void shouldThrowException() {
-				assertThatThrownBy(() -> wrapper.onNext(grpccResponse)).isInstanceOf(TechnicalException.class);
-			}
-		}
-
-		@Nested
-		class OnNotFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", false);
-			}
-
-			@Test
-			void shouldForwardToObserver() {
-				wrapper.onNext(grpccResponse);
-
-				verify(callObserver).onNext(grpccResponse);
-			}
-		}
-	}
-
-	@Nested
-	class TestOnError {
-
-		private final TechnicalException exception = new TechnicalException("dummy");
-
-		@Nested
-		class OnFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", true);
-			}
-
-			@Test
-			void shouldNotCallObserver() {
-				catchThrowable(() -> wrapper.onError(exception));
-
-				verifyNoInteractions(callObserver);
-			}
-
-			@Test
-			void shouldThrowException() {
-				assertThatThrownBy(() -> wrapper.onError(exception)).isInstanceOf(TechnicalException.class);
-			}
-		}
-
-		@Nested
-		class OnNotFailed {
-
-			private ArgumentCaptor<BinaryFileDownloadException> captor = ArgumentCaptor.forClass(BinaryFileDownloadException.class);
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", false);
-			}
-
-			@Test
-			void shouldForwardBinaryFileDownloadExceptionToObserver() {
-				wrapper.onError(exception);
-
-				verify(callObserver).onError(captor.capture());
-				assertThat(captor.getValue()).isInstanceOf(BinaryFileDownloadException.class);
-				assertThat(captor.getValue().getCause()).isEqualTo(exception);
-			}
-
-			@Test
-			void shouldSetFailedToTrue() {
-				wrapper.onError(exception);
-
-				assertThat(wrapper).extracting("failed").isEqualTo(true);
-			}
-		}
-	}
-
-	@Nested
-	class TestOnCompleted {
-
-		@Nested
-		class OnFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", true);
-			}
-
-			@Test
-			void shouldNotCallObserver() {
-				catchThrowable(wrapper::onCompleted);
-
-				verifyNoInteractions(callObserver);
-			}
-
-			@Test
-			void shouldThrowException() {
-				assertThatThrownBy(wrapper::onCompleted).isInstanceOf(TechnicalException.class);
-			}
-		}
-
-		@Nested
-		class OnNotFailed {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(wrapper, "failed", false);
-			}
-
-			@Test
-			void shouldForwardToObserver() {
-				wrapper.onCompleted();
-
-				verify(callObserver).onCompleted();
-			}
-		}
-	}
-
-	private static class GrpcResponseDummy {
-
-	}
-}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
index e4f265d..2e45282 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
@@ -34,6 +34,7 @@ import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -50,12 +51,13 @@ import com.google.protobuf.ByteString;
 
 import de.ozgcloud.common.errorhandling.TechnicalException;
 import de.ozgcloud.common.test.ReflectionTestUtils;
+import io.grpc.stub.CallStreamObserver;
 import lombok.SneakyThrows;
 
 class GrpcBinaryFileServerDownloaderTest {
 
 	@Mock
-	private CallStreamObserverWrapper<GrpcResponseDummy> callObserverWrapper;
+	private CallStreamObserver<GrpcResponseDummy> callObserver;
 	@Mock
 	private Function<ByteString, GrpcResponseDummy> chunkBuilder;
 	@Mock
@@ -67,9 +69,8 @@ class GrpcBinaryFileServerDownloaderTest {
 
 	@BeforeEach
 	void init() {
-		downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().downloadConsumer(downloadConsumer)
+		downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().callObserver(callObserver).downloadConsumer(downloadConsumer)
 				.chunkBuilder(chunkBuilder).taskExecutor(taskExecutor).build());
-		ReflectionTestUtils.setField(downloader, "callObserver", callObserverWrapper);
 	}
 
 	@Nested
@@ -152,7 +153,7 @@ class GrpcBinaryFileServerDownloaderTest {
 		void shouldSetOnReadyHandler() {
 			downloader.doStart();
 
-			verify(callObserverWrapper).setOnReadyHandler(runnableCaptor.capture());
+			verify(callObserver).setOnReadyHandler(runnableCaptor.capture());
 			assertThat(runnableCaptor.getValue()).isNotNull();
 		}
 	}
@@ -218,126 +219,289 @@ class GrpcBinaryFileServerDownloaderTest {
 	@Nested
 	class TestStartDownload {
 
-		@Captor
-		private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
+		@Mock
+		private PipedOutputStream outputStream;
+
+		@BeforeEach
+		void init() {
+			setOutputStreamField(outputStream);
+		}
 
-		@SneakyThrows
 		@Test
-		void shouldCallDoDownload() {
-			doNothing().when(downloader).withDownloadErrorHandling(any());
-			doNothing().when(downloader).doDownload();
+		void shouldErrorBeInitiallyNull() {
+			assertThat(getError()).isNull();
+		}
 
-			downloader.startDownload();
+		@Nested
+		class OnNoException {
 
-			verify(downloader).withDownloadErrorHandling(runnableCaptor.capture());
-			runnableCaptor.getValue().run();
-			verify(downloader).doDownload();
+			@BeforeEach
+			void init() {
+				doNothing().when(downloader).doDownload();
+			}
+
+			@Test
+			void shouldDoDownload() {
+				downloader.startDownload();
+
+				verify(downloader).doDownload();
+			}
+
+			@SneakyThrows
+			@Test
+			void shouldCloseOutputStream() {
+				downloader.startDownload();
+
+				verify(outputStream).close();
+			}
+
+			@Test
+			@DisplayName("should send chunks here to not wait for callObserver to change its ready status")
+			void shouldSendChunks() {
+				downloader.startDownload();
+
+				verify(downloader).sendChunks();
+			}
 		}
 
 		@Nested
-		class TestDoDownload {
+		class OnException {
 
-			@Mock
-			private PipedOutputStream outputStream;
+			private final TechnicalException exception = new TechnicalException("error");
 
 			@BeforeEach
-			void mock() {
-				setOutputStreamField(outputStream);
+			void init() {
+				doThrow(exception).when(downloader).doDownload();
 			}
 
-			@SneakyThrows
 			@Test
-			void shouldCallDownloadConsumer() {
-				downloader.doDownload();
+			void shouldSetError() {
+				downloader.startDownload();
 
-				verify(downloadConsumer).accept(outputStream);
+				assertThat(getError()).isInstanceOf(BinaryFileDownloadException.class).hasCause(exception);
 			}
 
 			@SneakyThrows
 			@Test
 			void shouldCloseOutputStream() {
-				downloader.doDownload();
+				downloader.startDownload();
 
 				verify(outputStream).close();
 			}
+
+			@Test
+			@DisplayName("should send chunks here to communicate error to callObserver")
+			void shouldSendChunks() {
+				downloader.startDownload();
+
+				verify(downloader).sendChunks();
+			}
 		}
 	}
 
 	@Nested
-	class TestSendChunks {
+	class TestDoDownload {
 
-		@Captor
-		private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
+		@Mock
+		private PipedOutputStream outputStream;
+
+		@BeforeEach
+		void mock() {
+			setOutputStreamField(outputStream);
+		}
 
-		@SneakyThrows
 		@Test
-		void shouldCallWithDownloadErrorHandling() {
-			doNothing().when(downloader).withDownloadErrorHandling(any());
-			doNothing().when(downloader).doSendChunks();
+		void shouldCallDownloadConsumer() {
+			downloader.doDownload();
 
-			downloader.sendChunks();
+			verify(downloadConsumer).accept(outputStream);
+		}
 
-			verify(downloader).withDownloadErrorHandling(runnableCaptor.capture());
-			runnableCaptor.getValue().run();
-			verify(downloader).doSendChunks();
+		@Test
+		void shouldDownloadFinishedBeInitiallyFalse() {
+			assertThat(getDownloadFinished()).isFalse();
 		}
 
-		@Nested
-		class TestDoSendChunks {
+		@Test
+		void shouldSetDownloadFinished() {
+			downloader.doDownload();
 
-			@Mock
-			private PipedInputStream inputStream;
-			@Captor
-			private ArgumentCaptor<ByteString> byteStringCaptor;
+			assertThat(getDownloadFinished()).isTrue();
+		}
+	}
+
+	@Nested
+	class TestSendChunks {
 
-			private final int readBytes = 20;
-			private final byte[] buffer = new byte[readBytes];
-			private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
+		@Nested
+		class OnNoException {
 
 			@SneakyThrows
 			@BeforeEach
-			void mock() {
-				doNothing().when(downloader).tryCompleteRequest();
-				when(callObserverWrapper.isReady()).thenReturn(true);
-				when(inputStream.read(any())).thenReturn(readBytes, -1);
-				setInputStreamField(inputStream);
-				new Random().nextBytes(buffer);
-				ReflectionTestUtils.setField(downloader, "buffer", buffer);
+			void init() {
+				doNothing().when(downloader).doSendChunks();
 			}
 
+			@SneakyThrows
 			@Test
-			void shouldCallChunkBuilder() {
-				doSendChunks();
+			void shouldDoSendChunks() {
+				downloader.sendChunks();
 
-				verify(chunkBuilder).apply(byteStringCaptor.capture());
-				assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
+				verify(downloader).doSendChunks();
 			}
+		}
 
-			@DisplayName("should send next chunk if callObserver is ready and stream already received data")
-			@Test
-			void shouldCallOnNext() {
-				when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
+		@Nested
+		class OnException {
 
-				doSendChunks();
+			private final TechnicalException exception = new TechnicalException("error");
 
-				verify(callObserverWrapper).onNext(grpcResponseDummy);
+			@SneakyThrows
+			@BeforeEach
+			void init() {
+				doThrow(exception).when(downloader).doSendChunks();
 			}
 
-			@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
 			@Test
-			void shouldCallCompleteDownload() {
-				setDownloadFinishedField(true);
+			void shouldCompleteRequestWithError() {
+				downloader.sendChunks();
 
+				verify(downloader).completeRequestWithError(exception);
+			}
+		}
+	}
+
+	@Nested
+	class TestDoSendChunks {
+
+		@Nested
+		class OnRequestFinished {
+
+			@BeforeEach
+			void init() {
+				setRequestFinishedField(true);
+			}
+
+			@Test
+			void shouldNotInteractWithCallObserver() {
 				doSendChunks();
 
-				verify(downloader).tryCompleteRequest();
+				verifyNoInteractions(callObserver);
 			}
+		}
 
-			@SneakyThrows
-			private void doSendChunks() {
-				downloader.doSendChunks();
+		@Nested
+		class OnRequestNotFinished {
+
+			@Nested
+			class OnNotReady {
+
+				@BeforeEach
+				void init() {
+					when(callObserver.isReady()).thenReturn(false);
+				}
+
+				@Test
+				void shouldOnlyCallIsReadyOnObserver() {
+					doSendChunks();
+
+					verify(callObserver).isReady();
+					verifyNoMoreInteractions(callObserver);
+				}
+			}
+
+			@Nested
+			class OnReady {
+
+				@BeforeEach
+				void init() {
+					when(callObserver.isReady()).thenReturn(true);
+				}
+
+				@Nested
+				class OnHasError {
+
+					private final BinaryFileDownloadException exception = new BinaryFileDownloadException(new TechnicalException("error"));
+
+					@BeforeEach
+					void init() {
+						setErrorField(exception);
+						doNothing().when(downloader).completeRequestWithError(any());
+					}
+
+					@Test
+					void shouldOnlyCallIsReadyOnObserver() {
+						doSendChunks();
+
+						verify(callObserver).isReady();
+						verifyNoMoreInteractions(callObserver);
+					}
+
+					@Test
+					void shouldCompleteRequestWithError() {
+						doSendChunks();
+
+						verify(downloader).completeRequestWithError(exception);
+					}
+				}
+
+				@Nested
+				class OnHasNoError {
+					@Mock
+					private PipedInputStream inputStream;
+					@Captor
+					private ArgumentCaptor<ByteString> byteStringCaptor;
+
+					private final int readBytes = 20;
+					private final byte[] buffer = new byte[readBytes];
+					private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
+
+					@SneakyThrows
+					@BeforeEach
+					void mock() {
+						doNothing().when(downloader).tryCompleteRequest();
+						when(callObserver.isReady()).thenReturn(true);
+						when(inputStream.read(any())).thenReturn(readBytes, -1);
+						setInputStreamField(inputStream);
+						new Random().nextBytes(buffer);
+						ReflectionTestUtils.setField(downloader, "buffer", buffer);
+					}
+
+					@Test
+					void shouldCallChunkBuilder() {
+						doSendChunks();
+
+						verify(chunkBuilder).apply(byteStringCaptor.capture());
+						assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
+					}
+
+					@DisplayName("should send next chunk if callObserver is ready and stream already received data")
+					@Test
+					void shouldCallOnNext() {
+						when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
+
+						doSendChunks();
+
+						verify(callObserver).onNext(grpcResponseDummy);
+					}
+
+					@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
+					@Test
+					void shouldTryCompleteRequest() {
+						setDownloadFinishedField(true);
+
+						doSendChunks();
+
+						verify(downloader).tryCompleteRequest();
+					}
+				}
 			}
 		}
+
+		@SneakyThrows
+		private void doSendChunks() {
+			downloader.doSendChunks();
+		}
 	}
 
 	@Nested
@@ -453,69 +617,41 @@ class GrpcBinaryFileServerDownloaderTest {
 		void shouldCallOnCompleted() {
 			downloader.completeRequest();
 
-			verify(callObserverWrapper).onCompleted();
+			verify(callObserver).onCompleted();
 		}
 	}
 
 	@Nested
-	class TestWithDownloadErrorHandling {
-
-		@SneakyThrows
-		@Test
-		void shouldRunRunnable() {
-			var runnable = mock(ExceptionalRunnable.class);
+	class TestCompleteRequestWithError {
 
-			downloader.withDownloadErrorHandling(runnable);
+		private final Throwable error = new Throwable();
 
-			verify(runnable).run();
+		@BeforeEach
+		void init() {
+			doNothing().when(downloader).closeInputStream();
 		}
 
-		@Nested
-		class OnException {
-
-			@Mock
-			private PipedOutputStream outputStream;
-			@Mock
-			private PipedInputStream inputStream;
-
-			private final TechnicalException exception = new TechnicalException("dummy");
-
-			@BeforeEach
-			void init() {
-				setInputStreamField(inputStream);
-				setOutputStreamField(outputStream);
-			}
-
-			@SneakyThrows
-			@Test
-			void shouldCloseOutputStream() {
-				withDownloadErrorHandling();
+		@Test
+		void shouldSetRequestFinished() {
+			assertThat(getRequestFinished()).isFalse();
 
-				verify(outputStream).close();
-			}
+			downloader.completeRequestWithError(error);
 
-			@SneakyThrows
-			@Test
-			void shouldCloseInputStream() {
-				withDownloadErrorHandling();
-
-				verify(inputStream).close();
-			}
+			assertThat(getRequestFinished()).isTrue();
+		}
 
-			@Test
-			void shouldNotifyCallObserver() {
-				withDownloadErrorHandling();
+		@Test
+		void shouldCloseInputStream() {
+			downloader.completeRequestWithError(error);
 
-				verify(callObserverWrapper).onError(argThat(TechnicalException.class::isInstance));
-			}
+			verify(downloader).closeInputStream();
+		}
 
-			private void withDownloadErrorHandling() {
-				downloader.withDownloadErrorHandling(this::dummyMethodThrowingException);
-			}
+		@Test
+		void shouldNotifyCallObserver() {
+			downloader.completeRequestWithError(error);
 
-			private void dummyMethodThrowingException() throws TechnicalException {
-				throw exception;
-			}
+			verify(callObserver).onError(error);
 		}
 	}
 
@@ -527,10 +663,6 @@ class GrpcBinaryFileServerDownloaderTest {
 		ReflectionTestUtils.setField(downloader, "inputStream", inputStream);
 	}
 
-	private void setDownloadFinishedField(boolean downloadFinished) {
-		ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
-	}
-
 	private void setRequestFinishedField(boolean requestFinished) {
 		ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
 	}
@@ -539,6 +671,22 @@ class GrpcBinaryFileServerDownloaderTest {
 		return ReflectionTestUtils.getField(downloader, "requestFinished", AtomicBoolean.class).get();
 	}
 
+	private void setErrorField(BinaryFileDownloadException error) {
+		ReflectionTestUtils.setField(downloader, "error", new AtomicReference<>(error));
+	}
+
+	private BinaryFileDownloadException getError() {
+		return (BinaryFileDownloadException) ReflectionTestUtils.getField(downloader, "error", AtomicReference.class).get();
+	}
+
+	private void setDownloadFinishedField(boolean downloadFinished) {
+		ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
+	}
+
+	private boolean getDownloadFinished() {
+		return ReflectionTestUtils.getField(downloader, "downloadFinished", AtomicBoolean.class).get();
+	}
+
 	private static class GrpcResponseDummy {
 	}
 }
\ No newline at end of file
-- 
GitLab