From d1187a9ec9c5a4c3a9f0e6c5a6dd0f2e29fba6e3 Mon Sep 17 00:00:00 2001
From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com>
Date: Mon, 27 Jan 2025 11:11:27 +0100
Subject: [PATCH] OZG-7262 OZG-7566 Fix error handling

---
 .../BinaryFileDownloadException.java          |  33 +++
 .../binaryfile/CallStreamObserverWrapper.java |  78 ++++++
 .../GrpcBinaryFileServerDownloader.java       |  49 ++--
 .../CallStreamObserverWrapperTest.java        | 254 ++++++++++++++++++
 .../GrpcBinaryFileServerDownloaderTest.java   | 223 +++++++++------
 5 files changed, 535 insertions(+), 102 deletions(-)
 create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileDownloadException.java
 create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java
 create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileDownloadException.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileDownloadException.java
new file mode 100644
index 0000000..9a5d462
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileDownloadException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+
+class BinaryFileDownloadException extends TechnicalException {
+
+	public BinaryFileDownloadException(Throwable cause) {
+		super("Error occurred during downloading file content", cause);
+	}
+}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java
new file mode 100644
index 0000000..a0462c6
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapper.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.util.function.Supplier;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+	class CallStreamObserverWrapper<V> {
+
+	private final CallStreamObserver<V> callStreamObserver;
+	private boolean failed;
+
+	public void setOnReadyHandler(Runnable onReadyHandler) {
+		callStreamObserver.setOnReadyHandler(onReadyHandler);
+	}
+
+	public synchronized boolean isReady() {
+		return ifNotFailed(callStreamObserver::isReady);
+	}
+
+	public synchronized void onNext(V value) {
+		ifNotFailed(() -> callStreamObserver.onNext(value));
+	}
+
+	public synchronized void onError(Throwable t) {
+		if (!failed) {
+			callStreamObserver.onError(new BinaryFileDownloadException(t));
+			failed = true;
+		} else {
+			handleIllegalCallAfterError();
+		}
+	}
+
+	public synchronized void onCompleted() {
+		ifNotFailed(callStreamObserver::onCompleted);
+	}
+
+	private void ifNotFailed(Runnable runnable) {
+		if (!failed) {
+			runnable.run();
+		} else {
+			handleIllegalCallAfterError();
+		}
+	}
+
+	private <T> T ifNotFailed(Supplier<T> supplier) {
+		return !failed ? supplier.get() : handleIllegalCallAfterError();
+	}
+
+	private <T> T handleIllegalCallAfterError() {
+		throw new TechnicalException("CallStreamObserver called after error");
+	}
+}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
index cb4a208..44f3fae 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java
@@ -23,14 +23,6 @@
  */
 package de.ozgcloud.common.binaryfile;
 
-import com.google.protobuf.ByteString;
-import de.ozgcloud.common.errorhandling.TechnicalException;
-import io.grpc.stub.CallStreamObserver;
-import lombok.Builder;
-import lombok.extern.log4j.Log4j2;
-import org.apache.commons.io.IOUtils;
-import org.springframework.core.task.TaskExecutor;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
@@ -39,12 +31,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import org.apache.commons.io.IOUtils;
+import org.springframework.core.task.TaskExecutor;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.stub.CallStreamObserver;
+import lombok.Builder;
+import lombok.extern.log4j.Log4j2;
+
 @Log4j2
 public class GrpcBinaryFileServerDownloader<T> {
 
 	private static final int CHUNK_SIZE = 255 * 1024;
 
-	private final CallStreamObserver<T> callObserver;
+	private final CallStreamObserverWrapper<T> callObserver;
 	private final Function<ByteString, T> chunkBuilder;
 	private final Consumer<OutputStream> downloadConsumer;
 	private final TaskExecutor taskExecutor;
@@ -60,7 +61,7 @@ public class GrpcBinaryFileServerDownloader<T> {
 	@Builder
 	public GrpcBinaryFileServerDownloader(CallStreamObserver<T> callObserver, Function<ByteString, T> chunkBuilder,
 			Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) {
-		this.callObserver = callObserver;
+		this.callObserver = new CallStreamObserverWrapper<>(callObserver);
 		this.chunkBuilder = chunkBuilder;
 		this.downloadConsumer = downloadConsumer;
 		this.taskExecutor = taskExecutor;
@@ -78,11 +79,20 @@ public class GrpcBinaryFileServerDownloader<T> {
 
 	void doStart() {
 		LOG.debug("Starting download.");
-		handleSafety(this::setupStreams);
+		safelySetupStreams();
 		taskExecutor.execute(this::startDownload);
 		callObserver.setOnReadyHandler(this::sendChunks);
 	}
 
+	void safelySetupStreams() {
+		try {
+			setupStreams();
+		} catch (Exception e) {
+			closeStreams();
+			throw new BinaryFileDownloadException(e);
+		}
+	}
+
 	void setupStreams() throws IOException {
 		outputStream = new PipedOutputStream();
 		inputStream = new PipedInputStream(GrpcBinaryFileServerDownloader.CHUNK_SIZE);
@@ -90,7 +100,7 @@ public class GrpcBinaryFileServerDownloader<T> {
 	}
 
 	void startDownload() {
-		handleSafety(this::doDownload);
+		withDownloadErrorHandling(this::doDownload);
 	}
 
 	void doDownload() {
@@ -103,7 +113,7 @@ public class GrpcBinaryFileServerDownloader<T> {
 	}
 
 	synchronized void sendChunks() {
-		handleSafety(this::doSendChunks);
+		withDownloadErrorHandling(this::doSendChunks);
 	}
 
 	void doSendChunks() throws IOException {
@@ -137,16 +147,20 @@ public class GrpcBinaryFileServerDownloader<T> {
 		callObserver.onCompleted();
 	}
 
-	void handleSafety(ExceptionalRunnable runnable) {
+	void withDownloadErrorHandling(ExceptionalRunnable runnable) {
 		try {
 			runnable.run();
 		} catch (Exception e) {
-			closeOutputStream();
-			closeInputStream();
-			callObserver.onError(new TechnicalException("Error occurred during downloading file content download.", e));
+			closeStreams();
+			callObserver.onError(e);
 		}
 	}
 
+	private void closeStreams() {
+		closeOutputStream();
+		closeInputStream();
+	}
+
 	void closeOutputStream() {
 		IOUtils.closeQuietly(outputStream, e -> LOG.error("OutputStream cannot be closed.", e));
 	}
@@ -154,5 +168,4 @@ public class GrpcBinaryFileServerDownloader<T> {
 	void closeInputStream() {
 		IOUtils.closeQuietly(inputStream, e -> LOG.error("InputStream cannot be closed.", e));
 	}
-
 }
\ No newline at end of file
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java
new file mode 100644
index 0000000..c1d5999
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/CallStreamObserverWrapperTest.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import de.ozgcloud.common.test.ReflectionTestUtils;
+import io.grpc.stub.CallStreamObserver;
+
+class CallStreamObserverWrapperTest {
+
+	@Mock
+	private CallStreamObserver<GrpcResponseDummy> callObserver;
+	@InjectMocks
+	private CallStreamObserverWrapper<GrpcResponseDummy> wrapper;
+
+	@Nested
+	class TestSetOnReadyHandler {
+
+		@Test
+		void shouldForwardToObserver() {
+			var onReadyHandler = mock(Runnable.class);
+
+			wrapper.setOnReadyHandler(onReadyHandler);
+
+			verify(callObserver).setOnReadyHandler(onReadyHandler);
+		}
+	}
+
+	@Nested
+	class TestIsReady {
+
+		@Nested
+		class OnFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", true);
+			}
+
+			@Test
+			void shouldNotCallObserver() {
+				catchThrowable(wrapper::isReady);
+
+				verifyNoInteractions(callObserver);
+			}
+
+			@Test
+			void shouldThrowException() {
+				assertThatThrownBy(wrapper::isReady).isInstanceOf(TechnicalException.class);
+			}
+		}
+
+		@Nested
+		class OnNotFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", false);
+			}
+
+			@Test
+			void shouldForwardToObserver() {
+				wrapper.isReady();
+
+				verify(callObserver).isReady();
+			}
+
+			@ParameterizedTest
+			@ValueSource(booleans = { true, false })
+			void shouldReturnObserverResult(boolean observerResult) {
+				when(callObserver.isReady()).thenReturn(observerResult);
+
+				var ready = wrapper.isReady();
+
+				assertThat(ready).isEqualTo(observerResult);
+			}
+		}
+	}
+
+	@Nested
+	class TestOnNext {
+
+		private final GrpcResponseDummy grpccResponse = new GrpcResponseDummy();
+
+		@Nested
+		class OnFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", true);
+			}
+
+			@Test
+			void shouldNotCallObserver() {
+				catchThrowable(() -> wrapper.onNext(grpccResponse));
+
+				verifyNoInteractions(callObserver);
+			}
+
+			@Test
+			void shouldThrowException() {
+				assertThatThrownBy(() -> wrapper.onNext(grpccResponse)).isInstanceOf(TechnicalException.class);
+			}
+		}
+
+		@Nested
+		class OnNotFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", false);
+			}
+
+			@Test
+			void shouldForwardToObserver() {
+				wrapper.onNext(grpccResponse);
+
+				verify(callObserver).onNext(grpccResponse);
+			}
+		}
+	}
+
+	@Nested
+	class TestOnError {
+
+		private final TechnicalException exception = new TechnicalException("dummy");
+
+		@Nested
+		class OnFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", true);
+			}
+
+			@Test
+			void shouldNotCallObserver() {
+				catchThrowable(() -> wrapper.onError(exception));
+
+				verifyNoInteractions(callObserver);
+			}
+
+			@Test
+			void shouldThrowException() {
+				assertThatThrownBy(() -> wrapper.onError(exception)).isInstanceOf(TechnicalException.class);
+			}
+		}
+
+		@Nested
+		class OnNotFailed {
+
+			private ArgumentCaptor<BinaryFileDownloadException> captor = ArgumentCaptor.forClass(BinaryFileDownloadException.class);
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", false);
+			}
+
+			@Test
+			void shouldForwardBinaryFileDownloadExceptionToObserver() {
+				wrapper.onError(exception);
+
+				verify(callObserver).onError(captor.capture());
+				assertThat(captor.getValue()).isInstanceOf(BinaryFileDownloadException.class);
+				assertThat(captor.getValue().getCause()).isEqualTo(exception);
+			}
+
+			@Test
+			void shouldSetFailedToTrue() {
+				wrapper.onError(exception);
+
+				assertThat(wrapper).extracting("failed").isEqualTo(true);
+			}
+		}
+	}
+
+	@Nested
+	class TestOnCompleted {
+
+		@Nested
+		class OnFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", true);
+			}
+
+			@Test
+			void shouldNotCallObserver() {
+				catchThrowable(wrapper::onCompleted);
+
+				verifyNoInteractions(callObserver);
+			}
+
+			@Test
+			void shouldThrowException() {
+				assertThatThrownBy(wrapper::onCompleted).isInstanceOf(TechnicalException.class);
+			}
+		}
+
+		@Nested
+		class OnNotFailed {
+
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(wrapper, "failed", false);
+			}
+
+			@Test
+			void shouldForwardToObserver() {
+				wrapper.onCompleted();
+
+				verify(callObserver).onCompleted();
+			}
+		}
+	}
+
+	private static class GrpcResponseDummy {
+
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
index 483e38d..e4f265d 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java
@@ -23,19 +23,9 @@
  */
 package de.ozgcloud.common.binaryfile;
 
-import com.google.protobuf.ByteString;
-import de.ozgcloud.common.errorhandling.TechnicalException;
-import de.ozgcloud.common.test.ReflectionTestUtils;
-import io.grpc.stub.CallStreamObserver;
-import lombok.SneakyThrows;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.springframework.core.task.TaskExecutor;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -47,14 +37,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.springframework.core.task.TaskExecutor;
+
+import com.google.protobuf.ByteString;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import de.ozgcloud.common.test.ReflectionTestUtils;
+import lombok.SneakyThrows;
 
 class GrpcBinaryFileServerDownloaderTest {
 
 	@Mock
-	private CallStreamObserver<GrpcResponseDummy> callObserver;
+	private CallStreamObserverWrapper<GrpcResponseDummy> callObserverWrapper;
 	@Mock
 	private Function<ByteString, GrpcResponseDummy> chunkBuilder;
 	@Mock
@@ -66,11 +67,11 @@ class GrpcBinaryFileServerDownloaderTest {
 
 	@BeforeEach
 	void init() {
-		downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().callObserver(callObserver).downloadConsumer(downloadConsumer)
+		downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().downloadConsumer(downloadConsumer)
 				.chunkBuilder(chunkBuilder).taskExecutor(taskExecutor).build());
+		ReflectionTestUtils.setField(downloader, "callObserver", callObserverWrapper);
 	}
 
-	@DisplayName("Start")
 	@Nested
 	class TestStart {
 
@@ -116,25 +117,23 @@ class GrpcBinaryFileServerDownloaderTest {
 		}
 	}
 
-	@DisplayName("do start")
 	@Nested
 	class TestDoStart {
 
 		@Captor
 		private ArgumentCaptor<Runnable> runnableCaptor;
-		@Captor
-		private ArgumentCaptor<ExceptionalRunnable> setupStreamCaptor;
+
+		@BeforeEach
+		void init() {
+			doNothing().when(downloader).safelySetupStreams();
+		}
 
 		@SneakyThrows
 		@Test
-		void shouldCallSetupStreams() {
-			doNothing().when(downloader).handleSafety(any());
-
+		void shouldSafelySetupStreams() {
 			downloader.doStart();
 
-			verify(downloader).handleSafety(setupStreamCaptor.capture());
-			setupStreamCaptor.getValue().run();
-			verify(downloader).setupStreams();
+			verify(downloader).safelySetupStreams();
 		}
 
 		@Test
@@ -153,12 +152,69 @@ class GrpcBinaryFileServerDownloaderTest {
 		void shouldSetOnReadyHandler() {
 			downloader.doStart();
 
-			verify(callObserver).setOnReadyHandler(runnableCaptor.capture());
+			verify(callObserverWrapper).setOnReadyHandler(runnableCaptor.capture());
 			assertThat(runnableCaptor.getValue()).isNotNull();
 		}
 	}
 
-	@DisplayName("Start download")
+	@Nested
+	class TestSafelySetupStreams {
+
+		@SneakyThrows
+		@Test
+		void shouldSetupStreams() {
+			doNothing().when(downloader).setupStreams();
+
+			safelySetupStreams();
+
+			verify(downloader).setupStreams();
+		}
+
+		@Nested
+		class OnException {
+
+			@Mock
+			private PipedOutputStream outputStream;
+			@Mock
+			private PipedInputStream inputStream;
+
+			private final IOException exception = new IOException();
+
+			@SneakyThrows
+			@BeforeEach
+			void init() {
+				setInputStreamField(inputStream);
+				setOutputStreamField(outputStream);
+				doThrow(exception).when(downloader).setupStreams();
+			}
+
+			@Test
+			void shouldThrowBinaryFileDownloadException() {
+				assertThatThrownBy(() -> downloader.safelySetupStreams()).isInstanceOf(BinaryFileDownloadException.class).hasCause(exception);
+			}
+
+			@SneakyThrows
+			@Test
+			void shouldCloseOutputStream() {
+				catchThrowable(TestSafelySetupStreams.this::safelySetupStreams);
+
+				verify(outputStream).close();
+			}
+
+			@SneakyThrows
+			@Test
+			void shouldCloseInputStream() {
+				catchThrowable(TestSafelySetupStreams.this::safelySetupStreams);
+
+				verify(inputStream).close();
+			}
+		}
+
+		private void safelySetupStreams() {
+			downloader.safelySetupStreams();
+		}
+	}
+
 	@Nested
 	class TestStartDownload {
 
@@ -168,71 +224,64 @@ class GrpcBinaryFileServerDownloaderTest {
 		@SneakyThrows
 		@Test
 		void shouldCallDoDownload() {
-			doNothing().when(downloader).handleSafety(any());
+			doNothing().when(downloader).withDownloadErrorHandling(any());
 			doNothing().when(downloader).doDownload();
 
 			downloader.startDownload();
 
-			verify(downloader).handleSafety(runnableCaptor.capture());
+			verify(downloader).withDownloadErrorHandling(runnableCaptor.capture());
 			runnableCaptor.getValue().run();
 			verify(downloader).doDownload();
 		}
-	}
 
-	@DisplayName("do")
-	@Nested
-	class TestDoDownload {
+		@Nested
+		class TestDoDownload {
 
-		@Mock
-		private PipedOutputStream outputStream;
+			@Mock
+			private PipedOutputStream outputStream;
 
-		@BeforeEach
-		void mock() {
-			setOutputStreamField(outputStream);
-		}
+			@BeforeEach
+			void mock() {
+				setOutputStreamField(outputStream);
+			}
 
-		@SneakyThrows
-		@Test
-		void shouldCallDownloadConsumer() {
-			downloader.doDownload();
+			@SneakyThrows
+			@Test
+			void shouldCallDownloadConsumer() {
+				downloader.doDownload();
 
-			verify(downloadConsumer).accept(outputStream);
-		}
+				verify(downloadConsumer).accept(outputStream);
+			}
 
-		@SneakyThrows
-		@Test
-		void shouldCloseOutputStream() {
-			downloader.doDownload();
+			@SneakyThrows
+			@Test
+			void shouldCloseOutputStream() {
+				downloader.doDownload();
 
-			verify(outputStream).close();
+				verify(outputStream).close();
+			}
 		}
 	}
 
-	@DisplayName("Send chunks")
 	@Nested
 	class TestSendChunks {
 
-		@SneakyThrows
-		@Test
-		void shouldCallHandleSafety() {
-			doNothing().when(downloader).doSendChunks();
-
-			downloader.sendChunks();
-
-			verify(downloader).handleSafety(any(ExceptionalRunnable.class));
-		}
+		@Captor
+		private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
 
 		@SneakyThrows
 		@Test
-		void shouldCallDoDownload() {
+		void shouldCallWithDownloadErrorHandling() {
+			doNothing().when(downloader).withDownloadErrorHandling(any());
 			doNothing().when(downloader).doSendChunks();
 
 			downloader.sendChunks();
 
+			verify(downloader).withDownloadErrorHandling(runnableCaptor.capture());
+			runnableCaptor.getValue().run();
 			verify(downloader).doSendChunks();
 		}
 
-		@DisplayName("do")
 		@Nested
 		class TestDoSendChunks {
 
@@ -249,7 +298,7 @@ class GrpcBinaryFileServerDownloaderTest {
 			@BeforeEach
 			void mock() {
 				doNothing().when(downloader).tryCompleteRequest();
-				when(callObserver.isReady()).thenReturn(true);
+				when(callObserverWrapper.isReady()).thenReturn(true);
 				when(inputStream.read(any())).thenReturn(readBytes, -1);
 				setInputStreamField(inputStream);
 				new Random().nextBytes(buffer);
@@ -271,7 +320,7 @@ class GrpcBinaryFileServerDownloaderTest {
 
 				doSendChunks();
 
-				verify(callObserver).onNext(grpcResponseDummy);
+				verify(callObserverWrapper).onNext(grpcResponseDummy);
 			}
 
 			@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@@ -404,28 +453,35 @@ class GrpcBinaryFileServerDownloaderTest {
 		void shouldCallOnCompleted() {
 			downloader.completeRequest();
 
-			verify(callObserver).onCompleted();
+			verify(callObserverWrapper).onCompleted();
 		}
 	}
 
-	@DisplayName("Handle safety")
 	@Nested
-	class TestHandleSafety {
+	class TestWithDownloadErrorHandling {
+
+		@SneakyThrows
+		@Test
+		void shouldRunRunnable() {
+			var runnable = mock(ExceptionalRunnable.class);
+
+			downloader.withDownloadErrorHandling(runnable);
+
+			verify(runnable).run();
+		}
 
-		@DisplayName("on exception")
 		@Nested
-		class TestOnException {
+		class OnException {
 
 			@Mock
 			private PipedOutputStream outputStream;
 			@Mock
 			private PipedInputStream inputStream;
 
-			private final IOException exception = new IOException();
+			private final TechnicalException exception = new TechnicalException("dummy");
 
-			@SneakyThrows
 			@BeforeEach
-			void mock() {
+			void init() {
 				setInputStreamField(inputStream);
 				setOutputStreamField(outputStream);
 			}
@@ -433,7 +489,7 @@ class GrpcBinaryFileServerDownloaderTest {
 			@SneakyThrows
 			@Test
 			void shouldCloseOutputStream() {
-				handleSafety();
+				withDownloadErrorHandling();
 
 				verify(outputStream).close();
 			}
@@ -441,27 +497,26 @@ class GrpcBinaryFileServerDownloaderTest {
 			@SneakyThrows
 			@Test
 			void shouldCloseInputStream() {
-				handleSafety();
+				withDownloadErrorHandling();
 
 				verify(inputStream).close();
 			}
 
 			@Test
 			void shouldNotifyCallObserver() {
-				handleSafety();
+				withDownloadErrorHandling();
 
-				verify(callObserver).onError(argThat(TechnicalException.class::isInstance));
+				verify(callObserverWrapper).onError(argThat(TechnicalException.class::isInstance));
 			}
 
-			private void handleSafety() {
-				downloader.handleSafety(this::dummyMethodThrowingException);
+			private void withDownloadErrorHandling() {
+				downloader.withDownloadErrorHandling(this::dummyMethodThrowingException);
 			}
 
-			private void dummyMethodThrowingException() throws IOException {
+			private void dummyMethodThrowingException() throws TechnicalException {
 				throw exception;
 			}
 		}
-
 	}
 
 	private void setOutputStreamField(OutputStream outputStream) {
@@ -484,6 +539,6 @@ class GrpcBinaryFileServerDownloaderTest {
 		return ReflectionTestUtils.getField(downloader, "requestFinished", AtomicBoolean.class).get();
 	}
 
-	static class GrpcResponseDummy {
+	private static class GrpcResponseDummy {
 	}
 }
\ No newline at end of file
-- 
GitLab