From 18ec16871e41279509743f869fbdca2145fbe293 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Tue, 1 Apr 2025 13:03:24 +0200
Subject: [PATCH 01/12] OZG-7573 OZG-7991 File streaming using existing stream

---
 .../binaryfile/GrpcFileUploadUtils.java       | 191 +------------
 .../binaryfile/StreamExclusiveFileSender.java |  81 ++++++
 .../binaryfile/StreamSharingFileSender.java   |  67 +++++
 .../binaryfile/StreamingFileSender.java       | 176 ++++++++++++
 .../binaryfile/GrpcFileUploadUtilsTest.java   | 256 ++++-------------
 .../StreamExclusiveFileSenderTest.java        | 195 +++++++++++++
 .../StreamSharingFileSenderTest.java          | 105 +++++++
 .../binaryfile/StreamingFileSenderTest.java   | 257 ++++++++++++++++++
 8 files changed, 949 insertions(+), 379 deletions(-)
 create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
 create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java
 create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
 create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
 create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java
 create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index 1cbd50d..1b7f1c1 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -23,225 +23,60 @@
  */
 package de.ozgcloud.common.binaryfile;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.apache.commons.io.IOUtils;
-
-import de.ozgcloud.common.errorhandling.TechnicalException;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 
 @Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class GrpcFileUploadUtils {
 
-	static final int CHUNK_SIZE = 4 * 1024;
-
 	/*
 	 * Q = Request Type; S = Response Type
 	 */
 	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
-		return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
+		return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
 	}
 
-	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
-			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
-		return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
+	public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
+		return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
 	}
 
 	public static class FileSender<Q, S> {
-		private final BiFunction<byte[], Integer, Q> chunkBuilder;
-		private final InputStream inputStream;
 
-		@Getter
-		private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
-		private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
-		private CallStreamObserver<Q> requestObserver;
+		private final StreamExclusiveFileSender<Q, S> sender;
 
-		private Optional<Q> metaData = Optional.empty();
-		private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
-		private final AtomicBoolean done = new AtomicBoolean(false);
-
-		private final StreamReader streamReader;
-		private final boolean completeOnFileSent;
-
-		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
-				InputStream inputStream) {
-			this(chunkBuilder, reqObserverBuilder, inputStream, true);
-		}
-
-		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
-				InputStream inputStream, boolean completeOnFileSent) {
-			this.chunkBuilder = chunkBuilder;
-			this.inputStream = inputStream;
-			this.reqObserverBuilder = reqObserverBuilder;
-			this.completeOnFileSent = completeOnFileSent;
-
-			this.streamReader = new StreamReader(this.inputStream);
+		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+				Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+			this.sender = new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
 		}
 
 		public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
-			this.metaData = Optional.of(metaData);
+			sender.withMetaData(metaData);
 			return this;
 		}
 
 		public FileSender<Q, S> send() {
-			LOG.debug("Start sending File.");
-			var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
-			requestObserver = reqObserverBuilder.apply(responseObserver);
-
+			sender.send();
 			return this;
 		}
 
 		public void cancelOnTimeout() {
-			LOG.warn("File transfer canceled on timeout");
-			resultFuture.cancel(true);
-			requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
-			closeStreams();
+			sender.cancelOnTimeout();
 		}
 
 		public void cancelOnError(Throwable t) {
-			LOG.error("File tranfer canceled on error.", t);
-			resultFuture.cancel(true);
-			requestObserver.onError(t);
-			closeStreams();
-		}
-
-		void sendNext() {
-			if (!done.get()) {
-				waitForOberver();
-				sendMetaData();
-				do {
-					LOG.debug("Sending next chunk.");
-					sendNextChunk();
-				} while (!done.get() && isReady());
-				LOG.debug("Finished or waiting to become ready.");
-			}
-		}
-
-		private boolean isReady() {
-			return requestObserver.isReady();
-		}
-
-		private void waitForOberver() {
-			synchronized (this) {
-				while (Objects.isNull(requestObserver)) {
-					try {
-						LOG.debug("wait for observer");
-						wait(300);
-					} catch (InterruptedException e) {
-						LOG.error("Error on waiting for request Observer.", e);
-						Thread.currentThread().interrupt();
-					}
-				}
-			}
-
-		}
-
-		void sendNextChunk() {
-			byte[] contentToSend = streamReader.getNextData();
-
-			if (streamReader.getLastReadSize() > 0) {
-				sendChunk(contentToSend, streamReader.getLastReadSize());
-			} else {
-				endTransfer();
-			}
-		}
-
-		private void endTransfer() {
-			if (completeOnFileSent)
-				requestObserver.onCompleted();
-			else
-				sendEndOfFile();
-			done.set(true);
-			LOG.debug("File Transfer done.");
-			closeStreams();
-
-		}
-
-		private void sendEndOfFile() {
-			sendChunk(new byte[0], streamReader.getLastReadSize());
-		}
-
-		void closeStreams() {
-			LOG.debug("Closing streams");
-			streamReader.close();
-		}
-
-		void sendChunk(byte[] content, int length) {
-			LOG.debug("Sending {} byte Data.", length);
-			var chunk = chunkBuilder.apply(content, length);
-			requestObserver.onNext(chunk);
-		}
-
-		byte[] readFromStream() {
-			try {
-				return inputStream.readNBytes(CHUNK_SIZE);
-			} catch (IOException e) {
-				throw new TechnicalException("Error on sending a single chunk", e);
-			}
-		}
-
-		void sendMetaData() {
-			metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
-		}
-
-		private void doSendMetaData(Q metadata) {
-			LOG.debug("Sending Metadata.");
-			requestObserver.onNext(metadata);
-			metaDataSent.set(true);
-		}
-
-		void checkForEndOfStream(long sentSize) {
-			if (sentSize < CHUNK_SIZE) {
-				LOG.debug("File Transfer done. Closing stream.");
-				IOUtils.closeQuietly(inputStream);
-				requestObserver.onCompleted();
-				done.set(true);
-			} else {
-				LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
-			}
-		}
-
-		@RequiredArgsConstructor
-		private class StreamReader {
-			private final InputStream inStream;
-			private final byte[] buffer = new byte[CHUNK_SIZE];
-			@Getter
-			private int lastReadSize = 0;
-			@Getter
-			private final AtomicBoolean done = new AtomicBoolean(false);
-
-			byte[] getNextData() {
-				readNext();
-				return buffer;
-			}
-
-			void close() {
-				IOUtils.closeQuietly(inStream);
-			}
-
-			void readNext() {
-				try {
-					lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
-				} catch (IOException e) {
-					throw new TechnicalException("Error on reading a single chunk", e);
-				}
-			}
+			sender.cancelOnError(t);
 		}
 	}
 
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
new file mode 100644
index 0000000..8bcf0b3
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.InputStream;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
+
+	private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
+	private CallStreamObserver<Q> requestObserver;
+
+	StreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+		super(chunkBuilder, inputStream);
+
+		this.reqObserverBuilder = reqObserverBuilder;
+	}
+
+	@Override
+	public StreamExclusiveFileSender<Q, S> send() {
+		LOG.debug("Start sending File.");
+
+		// this responseObserver registers also onReadyHandler
+		var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext);
+		requestObserver = reqObserverBuilder.apply(responseObserver);
+
+		return this;
+	}
+
+	@Override
+	void communicateEndOfTransfer() {
+		requestObserver.onCompleted();
+	}
+
+	@Override
+	protected CallStreamObserver<Q> getRequestObserver() {
+		return requestObserver;
+	}
+
+	public void cancelOnTimeout() {
+		LOG.warn("File transfer canceled on timeout");
+		getResultFuture().cancel(true);
+		requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
+		closeStreams();
+	}
+
+	public void cancelOnError(Throwable t) {
+		LOG.error("File tranfer canceled on error.", t);
+		getResultFuture().cancel(true);
+		requestObserver.onError(t);
+		closeStreams();
+	}
+}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java
new file mode 100644
index 0000000..c5c993a
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.InputStream;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+import io.grpc.stub.CallStreamObserver;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+class StreamSharingFileSender<Q, S> extends StreamingFileSender<Q, S> {
+
+	private final CallStreamObserver<Q> requestObserver;
+	private final Consumer<Runnable> onReadyHandlerRegistrar;
+
+	StreamSharingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
+		super(chunkBuilder, inputStream);
+
+		this.requestObserver = requestObserver;
+		this.onReadyHandlerRegistrar = onReadyHandlerRegistrar;
+	}
+
+	@Override
+	public StreamSharingFileSender<Q, S> send() {
+		LOG.debug("Register onReadyHandler and start sending File.");
+		onReadyHandlerRegistrar.accept(this::sendNext);
+		return this;
+	}
+
+	@Override
+	void communicateEndOfTransfer() {
+		sendEndOfFile();
+		getResultFuture().complete(null);
+	}
+
+	private void sendEndOfFile() {
+		sendChunk(new byte[0], -1);
+	}
+
+	@Override
+	protected CallStreamObserver<Q> getRequestObserver() {
+		return requestObserver;
+	}
+}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
new file mode 100644
index 0000000..926ca00
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+
+import org.apache.commons.io.IOUtils;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+/*
+ * Q = Request Type; S = Response Type
+ */
+@Log4j2
+public abstract class StreamingFileSender<Q, S> {
+
+	static final int CHUNK_SIZE = 4 * 1024;
+	
+	private final BiFunction<byte[], Integer, Q> chunkBuilder;
+
+	@Getter
+	private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+
+	private Q metaData;
+	private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
+	private final AtomicBoolean done = new AtomicBoolean(false);
+
+	@Getter(AccessLevel.PROTECTED)
+	private final StreamReader streamReader;
+
+	StreamingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream) {
+		this.chunkBuilder = chunkBuilder;
+		this.streamReader = new StreamReader(inputStream);
+	}
+
+	public StreamingFileSender<Q, S> withMetaData(@NonNull Q metaData) {
+		this.metaData = metaData;
+		return this;
+	}
+
+	public abstract StreamingFileSender<Q, S> send();
+
+	void sendNext() {
+		if (!done.get()) {
+			waitForOberver();
+			sendMetaData();
+			do {
+				LOG.debug("Sending next chunk.");
+				sendNextChunk();
+			} while (!done.get() && isReady());
+			LOG.debug("Finished or waiting to become ready.");
+		}
+	}
+
+	private boolean isReady() {
+		return getRequestObserver().isReady();
+	}
+
+	private void waitForOberver() {
+		synchronized (this) {
+			while (Objects.isNull(getRequestObserver())) {
+				try {
+					LOG.debug("wait for observer");
+					wait(300);
+				} catch (InterruptedException e) {
+					LOG.error("Error on waiting for request Observer.", e);
+					Thread.currentThread().interrupt();
+				}
+			}
+		}
+
+	}
+
+	void sendNextChunk() {
+		byte[] contentToSend = streamReader.getNextData();
+
+		if (streamReader.getLastReadSize() > 0) {
+			sendChunk(contentToSend, streamReader.getLastReadSize());
+		} else {
+			endTransfer();
+		}
+	}
+
+	protected void endTransfer() {
+		communicateEndOfTransfer();
+		done.set(true);
+		LOG.debug("File Transfer done.");
+		closeStreams();
+	}
+
+	abstract void communicateEndOfTransfer();
+
+	void closeStreams() {
+		LOG.debug("Closing streams");
+		streamReader.close();
+	}
+
+	void sendChunk(byte[] content, int length) {
+		LOG.debug("Sending {} byte Data.", length);
+		var chunk = chunkBuilder.apply(content, length);
+		getRequestObserver().onNext(chunk);
+	}
+
+	void sendMetaData() {
+		if (metaData != null && !metaDataSent.get()) {
+			doSendMetaData(metaData);
+		}
+	}
+
+	private void doSendMetaData(Q metadata) {
+		LOG.debug("Sending Metadata.");
+		getRequestObserver().onNext(metadata);
+		metaDataSent.set(true);
+	}
+
+	protected abstract CallStreamObserver<Q> getRequestObserver();
+
+	@RequiredArgsConstructor
+	protected static class StreamReader {
+		private final InputStream inStream;
+		private final byte[] buffer = new byte[CHUNK_SIZE];
+		@Getter
+		private int lastReadSize = 0;
+		@Getter
+		private final AtomicBoolean done = new AtomicBoolean(false);
+
+		byte[] getNextData() {
+			readNext();
+			return buffer;
+		}
+
+		void close() {
+			IOUtils.closeQuietly(inStream);
+		}
+
+		void readNext() {
+			try {
+				lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
+			} catch (IOException e) {
+				throw new TechnicalException("Error on reading a single chunk", e);
+			}
+		}
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
index fb77f3c..82c4698 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
@@ -24,289 +24,143 @@
 package de.ozgcloud.common.binaryfile;
 
 import static org.assertj.core.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.apache.commons.lang3.RandomUtils;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
 import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
-import de.ozgcloud.common.errorhandling.TechnicalException;
+import de.ozgcloud.common.test.ReflectionTestUtils;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
-import lombok.SneakyThrows;
 
 class GrpcFileUploadUtilsTest {
 
-	@InjectMocks
-	private GrpcFileUploadUtils service;
-
 	@Mock
 	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
 	@Mock
+	private InputStream inputStream;
+	@Mock
 	private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
 	@Mock
 	private CallStreamObserver<TestRequestType> requestObserver;
 	@Mock
-	private InputStream inputStream;
-
-	private FileSender<TestRequestType, TestResponseType> fileSender;
-
-	@Mock
-	private TestRequestType metaData;
-
-	@BeforeEach
-	void init() {
-		when(reqObserverBuilder.apply(any())).thenReturn(requestObserver);
-		fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder));
-	}
+	private Consumer<Runnable> onReadyHandlerRegistrar;
 
 	@Nested
-	class TestCreateFileSender {
+	class TestCreateSender {
 
 		@Test
-		void shouldCreateRequestObserver() {
-			GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send();
+		void shouldReturnInstanceOfFileSender() {
+			var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
 
-			verify(reqObserverBuilder, atLeastOnce()).apply(notNull());
+			assertThat(createdSender).isInstanceOf(FileSender.class);
 		}
 	}
 
 	@Nested
-	class TestSendBinaryFile {
-
-		@Captor
-		private ArgumentCaptor<Runnable> runnableCaptor;
+	class TestCreateStreamSharingSender {
 
 		@Test
-		void shouldReturnSenderWithFuture() {
-			var result = fileSender.send();
+		void shouldReturnInstanceOfStreamSharingSender() {
+			var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
 
-			assertThat(result).isNotNull().extracting(FileSender::getResultFuture).isNotNull();
+			assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class);
 		}
 	}
 
 	@Nested
-	class TestSendNext {
-
-		@BeforeEach
-		void initObserver() {
-			fileSender.send();
-		}
-
-		@Test
-		void shouldCallSendMetaData() {
-			fileSender.sendNext();
+	class TestFileSender {
 
-			verify(fileSender).sendMetaData();
-		}
+		private final FileSender<TestRequestType, TestResponseType> fileSender = new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
+		@Mock
+		private StreamExclusiveFileSender<TestRequestType, TestResponseType> streamExclusiveFileSender;
 
 		@Test
-		void shouldSendNextChunk() {
-			fileSender.sendNext();
+		void shouldCreateStreamExclusiveFileSender() {
+			var internalFileSender = ReflectionTestUtils.getField(fileSender, "sender", StreamExclusiveFileSender.class);
 
-			verify(fileSender).sendNextChunk();
+			assertThat(internalFileSender).isInstanceOf(StreamExclusiveFileSender.class);
 		}
 
-	}
-
-	@Nested
-	class TestSendNextChunk {
-
-		private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
-		private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
-
-		@Captor
-		private ArgumentCaptor<byte[]> chunkCaptor;
-
 		@Nested
-		class TestOnDataAvailable {
-			@BeforeEach
-			void initObserver() {
-				fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
-				fileSender.send();
-			}
-
-			@Test
-			void shouldCallSendChunk() {
-				fileSender.sendNextChunk();
+		class TestMethods {
 
-				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
-				assertThat(chunkCaptor.getValue()).contains(content);
+			@BeforeEach
+			void init() {
+				ReflectionTestUtils.setField(fileSender, "sender", streamExclusiveFileSender);
 			}
-		}
-
-		@Nested
-		class TestOnNoBytesLeftToRead {
 
 			@Nested
-			class TestOnCompleteOnFileSent {
-				private static final boolean COMPLETE_ON_FILE_SENT = true;
-
-				@BeforeEach
-				void initialize() {
-					var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
-					byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
-					fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
-					fileSender.send();
-				}
+			class TestWithMetaData {
 
-				@Test
-				void shouldCallOnCompleted() {
-					fileSender.sendNextChunk();
-
-					verify(requestObserver).onCompleted();
-				}
+				private final TestRequestType request = new TestRequestType();
 
 				@Test
-				void shouldNotCallSendChunk() {
-					fileSender.sendNextChunk();
+				void shouldDelegate() {
+					fileSender.withMetaData(request);
 
-					verify(fileSender, never()).sendChunk(any(), anyInt());
+					verify(streamExclusiveFileSender).withMetaData(request);
 				}
 
 				@Test
-				@SneakyThrows
-				void shouldCallCloseStreams() {
-					fileSender.sendNextChunk();
+				void shouldReturnItself() {
+					var senderWithMetaData = fileSender.withMetaData(request);
 
-					verify(fileSender).closeStreams();
+					assertThat(senderWithMetaData).isSameAs(fileSender);
 				}
 			}
 
 			@Nested
-			class TestOnNotCompleteOnFileSent {
-				private static final boolean COMPLETE_ON_FILE_SENT = false;
-
-				@BeforeEach
-				void initialize() {
-					var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
-					byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
-					fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
-					fileSender.send();
-				}
+			class TestSend {
 
 				@Test
-				void shouldNotCallOnCompleted() {
-					fileSender.sendNextChunk();
+				void shouldDelegate() {
+					fileSender.send();
 
-					verify(requestObserver, never()).onCompleted();
+					verify(streamExclusiveFileSender).send();
 				}
 
 				@Test
-				void shouldCallSendChunk() {
-					fileSender.sendNextChunk();
+				void shouldReturnItself() {
+					var returnedSender = fileSender.send();
 
-					verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
-					assertThat(chunkCaptor.getValue()).isEmpty();
+					assertThat(returnedSender).isSameAs(fileSender);
 				}
+			}
+
+			@Nested
+			class TestCancelOnTimeout {
 
 				@Test
-				@SneakyThrows
-				void shouldCallCloseStreams() {
-					fileSender.sendNextChunk();
+				void shouldDelegate() {
+					fileSender.cancelOnTimeout();
 
-					verify(fileSender).closeStreams();
+					verify(streamExclusiveFileSender).cancelOnTimeout();
 				}
 			}
-		}
-
-	}
-
-	@Nested
-	class TestCloseStreams {
-
-		@Test
-		@SneakyThrows
-		void shouldCloseInputStream() {
-			fileSender.send();
-
-			fileSender.closeStreams();
-
-			verify(inputStream).close();
-		}
-	}
-
-	@Nested
-	class TestSendChunk {
-
-		private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
-
-		@BeforeEach
-		void initObserver() {
-			fileSender.send();
-		}
-
-		@Test
-		void shouldApplyBuildChunk() throws IOException {
-			fileSender.sendChunk(CHUNK_PART, 5);
 
-			verify(chunkBuilder).apply(CHUNK_PART, 5);
-		}
-
-		@Test
-		void shouldCallOnNext() throws IOException {
-			fileSender.sendChunk(CHUNK_PART, 5);
-
-			verify(requestObserver).onNext(any());
-		}
-	}
-
-	@Nested
-	class TestSendMetaData {
-
-		@BeforeEach
-		void initObserver() {
-			fileSender.send();
-		}
-
-		@Test
-		void shouldNotSendWithoutMetadata() {
-			fileSender.sendMetaData();
-
-			verify(requestObserver, never()).onNext(any());
-		}
-
-		@Test
-		void shouldSendMetadata() {
-			fileSender.withMetaData(metaData).sendMetaData();
-
-			verify(requestObserver).onNext(metaData);
-		}
-
-		@Test
-		void shouldSendMetadataOnlyOnce() {
-			fileSender.withMetaData(metaData).sendMetaData();
-			fileSender.sendMetaData();
+			@Nested
+			class TestCancelOnError {
 
-			verify(requestObserver).onNext(metaData);
-		}
-	}
+				@Test
+				void shouldDelegate() {
+					var error = new Throwable();
 
-	@Disabled("unused")
-	@Nested
-	class TestReadFromStream {
-		@Test
-		void shouldThrowException() throws IOException {
-			doThrow(IOException.class).when(inputStream).read(any(), anyInt(), anyInt());
+					fileSender.cancelOnError(error);
 
-			assertThatThrownBy(() -> fileSender.readFromStream()).isInstanceOf(TechnicalException.class);
+					verify(streamExclusiveFileSender).cancelOnError(error);
+				}
+			}
 		}
 	}
-
 }
\ No newline at end of file
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
new file mode 100644
index 0000000..04b347c
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.mockito.Mockito.*;
+
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Spy;
+
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import de.ozgcloud.common.test.ReflectionTestUtils;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+
+class StreamExclusiveFileSenderTest {
+
+	@Mock
+	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
+	@Mock
+	private InputStream inputStream;
+	@Mock
+	private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
+	@Spy
+	@InjectMocks
+	private StreamExclusiveFileSender<TestRequestType, TestResponseType> fileSender;
+
+	@Nested
+	class TestSend {
+
+		private final CompletableFuture<TestResponseType> resultFuture = CompletableFuture.completedFuture(new TestResponseType());
+		@Captor
+		private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
+		@Mock
+		private BinaryFileUploadStreamObserver<TestRequestType, TestResponseType> responseObserver;
+
+		@BeforeEach
+		void init() {
+			doReturn(resultFuture).when(fileSender).getResultFuture();
+		}
+
+		@SuppressWarnings("rawtypes")
+		@Test
+		void shouldCreateResponseObserver() {
+			try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
+				doNothing().when(fileSender).sendNext();
+
+				fileSender.send();
+
+				mocked.verify(() -> BinaryFileUploadStreamObserver.create(same(resultFuture), onReadyHandlerCaptor.capture()));
+				verifyCallsSendNext(onReadyHandlerCaptor.getValue());
+			}
+		}
+
+		@SuppressWarnings("rawtypes")
+		@Test
+		void shouldBuildRequestObserver() {
+			try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
+				mocked.when(() -> BinaryFileUploadStreamObserver.create(any(), any())).thenReturn(responseObserver);
+
+				fileSender.send();
+
+				verify(reqObserverBuilder).apply(responseObserver);
+			}
+		}
+
+		private void verifyCallsSendNext(Runnable runnable) {
+			runnable.run();
+			verify(fileSender).sendNext();
+		}
+	}
+
+	@Nested
+	class TestCommunicateEndOfTransfer {
+
+		@Mock
+		private CallStreamObserver<TestRequestType> requestObserver;
+
+		@Test
+		void shouldCallOnCompleted() {
+			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
+
+			fileSender.communicateEndOfTransfer();
+
+			verify(requestObserver).onCompleted();
+		}
+	}
+
+	@Nested
+	class TestCancelOnTimeout {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+		@Mock
+		private CallStreamObserver<TestRequestType> requestObserver;
+
+		@BeforeEach
+		void init() {
+			doReturn(resultFuture).when(fileSender).getResultFuture();
+			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnTimeout();
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCallOnError() {
+			fileSender.cancelOnTimeout();
+
+			verify(requestObserver).onError(any(TechnicalException.class));
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.cancelOnTimeout();
+
+			verify(fileSender).closeStreams();
+		}
+	}
+
+	@Nested
+	class TestCancelOnError {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+		@Mock
+		private CallStreamObserver<TestRequestType> requestObserver;
+		private final Throwable error = new Throwable();
+
+		@BeforeEach
+		void init() {
+			doReturn(resultFuture).when(fileSender).getResultFuture();
+			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnError(error);
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCallOnError() {
+			fileSender.cancelOnError(error);
+
+			verify(requestObserver).onError(error);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.cancelOnError(error);
+
+			verify(fileSender).closeStreams();
+		}
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java
new file mode 100644
index 0000000..6c2225c
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.InputStream;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
+import io.grpc.stub.CallStreamObserver;
+
+class StreamSharingFileSenderTest {
+
+	@Mock
+	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
+	@Mock
+	private InputStream inputStream;
+	@Mock
+	private CallStreamObserver<TestRequestType> requestObserver;
+	@Mock
+	private Consumer<Runnable> onReadyHandlerRegistrar;
+	@Spy
+	@InjectMocks
+	private StreamSharingFileSender<TestRequestType, TestResponseType> fileSender;
+
+	@Nested
+	class TestSend {
+
+		@Captor
+		private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
+
+		@Test
+		void shouldRegisterOnReadyHandler() {
+			doNothing().when(fileSender).sendNext();
+
+			fileSender.send();
+
+			verify(onReadyHandlerRegistrar).accept(onReadyHandlerCaptor.capture());
+			verifyCallsSendNext(onReadyHandlerCaptor.getValue());
+		}
+
+		@Test
+		void shouldReturnThis() {
+			var obj = fileSender.send();
+
+			assertThat(obj).isSameAs(fileSender);
+		}
+
+		private void verifyCallsSendNext(Runnable runnable) {
+			runnable.run();
+			verify(fileSender).sendNext();
+		}
+	}
+
+	@Nested
+	class TestCommunicateEndOfTransfer {
+
+		@Test
+		void shouldSendEmptyChunk() {
+			fileSender.communicateEndOfTransfer();
+
+			verify(fileSender).sendChunk(new byte[0], -1);
+		}
+
+		@Test
+		void shouldCompleteResultFuture() {
+			fileSender.communicateEndOfTransfer();
+
+			assertThat(fileSender.getResultFuture().isDone()).isTrue();
+		}
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
new file mode 100644
index 0000000..711461a
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.function.BiFunction;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
+import io.grpc.stub.CallStreamObserver;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.SneakyThrows;
+
+class StreamingFileSenderTest {
+
+	@Mock
+	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
+	@Mock
+	private InputStream inputStream;
+	@Mock
+	private CallStreamObserver<TestRequestType> requestObserver;
+
+	private TestFileSender fileSender;
+
+	@BeforeEach
+	void init() {
+		fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver));
+	}
+
+	@Nested
+	class TestSendNext {
+
+		@BeforeEach
+		void init() {
+			fileSender.send();
+		}
+
+		@Test
+		void shouldCallSendMetaData() {
+			fileSender.sendNext();
+
+			verify(fileSender).sendMetaData();
+		}
+
+		@Test
+		void shouldSendNextChunk() {
+			fileSender.sendNext();
+
+			verify(fileSender).sendNextChunk();
+		}
+	}
+
+	@Nested
+	class TestSendNextChunk {
+
+		private final byte[] content = RandomUtils.insecure().randomBytes(StreamingFileSender.CHUNK_SIZE / 2);
+		private final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
+
+		@Captor
+		private ArgumentCaptor<byte[]> chunkCaptor;
+
+		@Nested
+		class TestOnDataAvailable {
+
+			@BeforeEach
+			void init() {
+				fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
+			}
+
+			@Test
+			void shouldCallSendChunk() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
+				assertThat(chunkCaptor.getValue()).contains(content);
+			}
+		}
+
+		@Nested
+		class TestOnNoBytesLeftToRead {
+
+			@BeforeEach
+			void init() {
+				var buffer = new byte[StreamingFileSender.CHUNK_SIZE];
+				byteArrayInputStream.read(buffer, 0, StreamingFileSender.CHUNK_SIZE);
+				fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
+			}
+
+			@Test
+			void shouldNotCallSendChunk() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender, never()).sendChunk(any(), anyInt());
+			}
+
+			@Test
+			@SneakyThrows
+			void shouldCallCloseStreams() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender).closeStreams();
+			}
+		}
+	}
+
+	@Nested
+	class TestEndTransfer {
+
+		@Test
+		void shouldCommunicateEndOfTransfer() {
+			fileSender.endTransfer();
+
+			verify(fileSender).communicateEndOfTransfer();
+		}
+
+		@Test
+		void shouldSetDoneToTrue() {
+			fileSender.endTransfer();
+
+			fileSender.sendNext();
+			verify(fileSender, never()).sendNextChunk();
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.endTransfer();
+
+			verify(fileSender).closeStreams();
+		}
+	}
+
+	@Nested
+	class TestCloseStreams {
+
+		@Test
+		@SneakyThrows
+		void shouldCloseInputStream() {
+			fileSender.closeStreams();
+
+			verify(inputStream).close();
+		}
+	}
+
+	@Nested
+	class TestSendChunk {
+
+		private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
+
+		@BeforeEach
+		void initObserver() {
+			fileSender.send();
+		}
+
+		@Test
+		void shouldApplyBuildChunk() {
+			fileSender.sendChunk(CHUNK_PART, 5);
+
+			verify(chunkBuilder).apply(CHUNK_PART, 5);
+		}
+
+		@Test
+		void shouldCallOnNext() {
+			fileSender.sendChunk(CHUNK_PART, 5);
+
+			verify(requestObserver).onNext(any());
+		}
+	}
+
+	@Nested
+	class TestSendMetaData {
+
+		@Mock
+		private TestRequestType metaData;
+
+		@BeforeEach
+		void initObserver() {
+			fileSender.send();
+		}
+
+		@Test
+		void shouldNotSendWithoutMetadata() {
+			fileSender.sendMetaData();
+
+			verify(requestObserver, never()).onNext(any());
+		}
+
+		@Test
+		void shouldSendMetadata() {
+			fileSender.withMetaData(metaData).sendMetaData();
+
+			verify(requestObserver).onNext(metaData);
+		}
+
+		@Test
+		void shouldSendMetadataOnlyOnce() {
+			fileSender.withMetaData(metaData).sendMetaData();
+			fileSender.sendMetaData();
+
+			verify(requestObserver).onNext(metaData);
+		}
+	}
+
+	static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
+
+		@Getter(AccessLevel.PROTECTED)
+		private final CallStreamObserver<TestRequestType> requestObserver;
+
+		TestFileSender(BiFunction<byte[], Integer, TestRequestType> chunkBuilder, InputStream inputStream,
+				CallStreamObserver<TestRequestType> requestObserver) {
+			super(chunkBuilder, inputStream);
+			this.requestObserver = requestObserver;
+		}
+
+		@Override
+		public StreamingFileSender<TestRequestType, TestResponseType> send() {
+			return this;
+		}
+
+		@Override
+		void communicateEndOfTransfer() {
+		}
+	}
+}
-- 
GitLab


From 158e157be4ca026b280f44b4518a41770e441bb3 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Tue, 1 Apr 2025 13:40:10 +0200
Subject: [PATCH 02/12] OZG-7573 OZG-7991 Make FileSender extend (not delegate
 to) StreamExclusiveFileSender

---
 .../binaryfile/GrpcFileUploadUtils.java       | 46 ++++-----
 .../binaryfile/GrpcFileUploadUtilsTest.java   | 99 ++-----------------
 2 files changed, 31 insertions(+), 114 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index 1b7f1c1..f66eb63 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -32,51 +32,47 @@ import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import lombok.NonNull;
 import lombok.extern.log4j.Log4j2;
 
 @Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class GrpcFileUploadUtils {
 
-	/*
-	 * Q = Request Type; S = Response Type
+	/**
+	 * @param <Q> Request Type
+	 * @param <S> Response Type
+	 * @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead
 	 */
+	@Deprecated(since = "4.13.0")
 	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
 		return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
 	}
 
+	/**
+	 * @param <Q> Request Type
+	 * @param <S> Response Type
+	 */
+	public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+		return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
+	}
+
+	/**
+	 * @param <Q> Request Type
+	 * @param <S> Response Type
+	 */
 	public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 			CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
 		return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
 	}
 
-	public static class FileSender<Q, S> {
-
-		private final StreamExclusiveFileSender<Q, S> sender;
+	// for backwards compatibility
+	public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> {
 
 		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 				Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
-			this.sender = new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
-		}
-
-		public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
-			sender.withMetaData(metaData);
-			return this;
-		}
-
-		public FileSender<Q, S> send() {
-			sender.send();
-			return this;
-		}
-
-		public void cancelOnTimeout() {
-			sender.cancelOnTimeout();
-		}
-
-		public void cancelOnError(Throwable t) {
-			sender.cancelOnError(t);
+			super(chunkBuilder, inputStream, reqObserverBuilder);
 		}
 	}
 
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
index 82c4698..c94b69e 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
@@ -24,22 +24,18 @@
 package de.ozgcloud.common.binaryfile;
 
 import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.*;
 
 import java.io.InputStream;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
 
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
-import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
-import de.ozgcloud.common.test.ReflectionTestUtils;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 
@@ -60,107 +56,32 @@ class GrpcFileUploadUtilsTest {
 	class TestCreateSender {
 
 		@Test
-		void shouldReturnInstanceOfFileSender() {
+		void shouldReturnInstanceOfStreamExclusiveFileSender() {
 			var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
 
-			assertThat(createdSender).isInstanceOf(FileSender.class);
+			assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
 		}
 	}
 
 	@Nested
-	class TestCreateStreamSharingSender {
+	class TestCreateStreamExclusiveFileSender {
 
 		@Test
-		void shouldReturnInstanceOfStreamSharingSender() {
-			var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
+		void shouldReturnInstanceOfStreamExclusiveFileSender() {
+			var createdSender = GrpcFileUploadUtils.createStreamExclusiveFileSender(chunkBuilder, inputStream, reqObserverBuilder);
 
-			assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class);
+			assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
 		}
 	}
 
 	@Nested
-	class TestFileSender {
-
-		private final FileSender<TestRequestType, TestResponseType> fileSender = new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
-		@Mock
-		private StreamExclusiveFileSender<TestRequestType, TestResponseType> streamExclusiveFileSender;
+	class TestCreateStreamSharingSender {
 
 		@Test
-		void shouldCreateStreamExclusiveFileSender() {
-			var internalFileSender = ReflectionTestUtils.getField(fileSender, "sender", StreamExclusiveFileSender.class);
-
-			assertThat(internalFileSender).isInstanceOf(StreamExclusiveFileSender.class);
-		}
-
-		@Nested
-		class TestMethods {
-
-			@BeforeEach
-			void init() {
-				ReflectionTestUtils.setField(fileSender, "sender", streamExclusiveFileSender);
-			}
-
-			@Nested
-			class TestWithMetaData {
-
-				private final TestRequestType request = new TestRequestType();
-
-				@Test
-				void shouldDelegate() {
-					fileSender.withMetaData(request);
-
-					verify(streamExclusiveFileSender).withMetaData(request);
-				}
-
-				@Test
-				void shouldReturnItself() {
-					var senderWithMetaData = fileSender.withMetaData(request);
-
-					assertThat(senderWithMetaData).isSameAs(fileSender);
-				}
-			}
-
-			@Nested
-			class TestSend {
-
-				@Test
-				void shouldDelegate() {
-					fileSender.send();
-
-					verify(streamExclusiveFileSender).send();
-				}
-
-				@Test
-				void shouldReturnItself() {
-					var returnedSender = fileSender.send();
-
-					assertThat(returnedSender).isSameAs(fileSender);
-				}
-			}
-
-			@Nested
-			class TestCancelOnTimeout {
-
-				@Test
-				void shouldDelegate() {
-					fileSender.cancelOnTimeout();
-
-					verify(streamExclusiveFileSender).cancelOnTimeout();
-				}
-			}
-
-			@Nested
-			class TestCancelOnError {
-
-				@Test
-				void shouldDelegate() {
-					var error = new Throwable();
-
-					fileSender.cancelOnError(error);
+		void shouldReturnInstanceOfStreamSharingSender() {
+			var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
 
-					verify(streamExclusiveFileSender).cancelOnError(error);
-				}
-			}
+			assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class);
 		}
 	}
 }
\ No newline at end of file
-- 
GitLab


From 4d3043cbd427093b7d3c2f2412fbb8fc21a216bd Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Tue, 1 Apr 2025 14:01:57 +0200
Subject: [PATCH 03/12] OZG-7573 OZG-7991 Make FileSender override methods that
 return this

---
 .../common/binaryfile/GrpcFileUploadUtils.java      | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index f66eb63..74cd1c2 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -32,6 +32,7 @@ import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import lombok.NonNull;
 import lombok.extern.log4j.Log4j2;
 
 @Log4j2
@@ -74,6 +75,18 @@ public class GrpcFileUploadUtils {
 				Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
 			super(chunkBuilder, inputStream, reqObserverBuilder);
 		}
+
+		@Override
+		public FileSender<Q, S> send() {
+			super.send();
+			return this;
+		}
+
+		@Override
+		public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
+			super.withMetaData(metaData);
+			return this;
+		}
 	}
 
 }
\ No newline at end of file
-- 
GitLab


From 0317820ed5df3af8d5d231d881e7754133e87e8c Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Tue, 1 Apr 2025 17:58:41 +0200
Subject: [PATCH 04/12] OZG-7573 OZG-7991 Mark FileSender-class deprecated

---
 .../java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index 74cd1c2..b021ed1 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -68,7 +68,7 @@ public class GrpcFileUploadUtils {
 		return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
 	}
 
-	// for backwards compatibility
+	@Deprecated(since = "4.13.0")
 	public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> {
 
 		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
-- 
GitLab


From a0d677b3bff35dd101f28016fad733737f338a7c Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Tue, 1 Apr 2025 17:59:15 +0200
Subject: [PATCH 05/12] OZG-7573 OZG-7991 Move cancelOnTimeout() and
 cancelOnError() to StreamingFileSender

---
 .../binaryfile/StreamExclusiveFileSender.java |  8 +--
 .../binaryfile/StreamingFileSender.java       | 12 ++++
 .../StreamExclusiveFileSenderTest.java        |  6 +-
 .../binaryfile/StreamingFileSenderTest.java   | 55 +++++++++++++++++++
 4 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
index 8bcf0b3..92f9170 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
@@ -66,16 +66,12 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
 	}
 
 	public void cancelOnTimeout() {
-		LOG.warn("File transfer canceled on timeout");
-		getResultFuture().cancel(true);
+		super.cancelOnTimeout();
 		requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
-		closeStreams();
 	}
 
 	public void cancelOnError(Throwable t) {
-		LOG.error("File tranfer canceled on error.", t);
-		getResultFuture().cancel(true);
+		super.cancelOnError(t);
 		requestObserver.onError(t);
-		closeStreams();
 	}
 }
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
index 926ca00..f70efbc 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -72,6 +72,18 @@ public abstract class StreamingFileSender<Q, S> {
 
 	public abstract StreamingFileSender<Q, S> send();
 
+	public void cancelOnTimeout() {
+		LOG.warn("File transfer canceled on timeout");
+		resultFuture.cancel(true);
+		closeStreams();
+	}
+
+	public void cancelOnError(Throwable t) {
+		LOG.error("File tranfer canceled on error.", t);
+		resultFuture.cancel(true);
+		closeStreams();
+	}
+
 	void sendNext() {
 		if (!done.get()) {
 			waitForOberver();
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
index 04b347c..880ac38 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
@@ -39,11 +39,11 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
 import org.mockito.Spy;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
 import de.ozgcloud.common.errorhandling.TechnicalException;
-import de.ozgcloud.common.test.ReflectionTestUtils;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 
@@ -130,7 +130,7 @@ class StreamExclusiveFileSenderTest {
 
 		@BeforeEach
 		void init() {
-			doReturn(resultFuture).when(fileSender).getResultFuture();
+			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
 			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
 		}
 
@@ -167,7 +167,7 @@ class StreamExclusiveFileSenderTest {
 
 		@BeforeEach
 		void init() {
-			doReturn(resultFuture).when(fileSender).getResultFuture();
+			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
 			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
 		}
 
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
index 711461a..47ddd15 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.*;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
 import org.apache.commons.lang3.RandomUtils;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
@@ -61,6 +63,59 @@ class StreamingFileSenderTest {
 		fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver));
 	}
 
+	@Nested
+	class TestCancelOnTimeout {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+
+		@BeforeEach
+		void init() {
+			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnTimeout();
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.cancelOnTimeout();
+
+			verify(fileSender).closeStreams();
+		}
+	}
+
+	@Nested
+	class TestCancelOnError {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+		private final Throwable error = new Throwable();
+
+		@BeforeEach
+		void init() {
+			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnError(error);
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.cancelOnError(error);
+
+			verify(fileSender).closeStreams();
+		}
+	}
+
 	@Nested
 	class TestSendNext {
 
-- 
GitLab


From fbceb90140a6507afca8081a5478a2997153bb38 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 09:27:44 +0200
Subject: [PATCH 06/12] OZG-7573 OZG-7991 Rename methode closeStreams()

---
 .../common/binaryfile/StreamingFileSender.java       |  8 ++++----
 .../binaryfile/StreamExclusiveFileSenderTest.java    |  8 ++++----
 .../common/binaryfile/StreamingFileSenderTest.java   | 12 ++++++------
 3 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
index f70efbc..9d0440e 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -75,13 +75,13 @@ public abstract class StreamingFileSender<Q, S> {
 	public void cancelOnTimeout() {
 		LOG.warn("File transfer canceled on timeout");
 		resultFuture.cancel(true);
-		closeStreams();
+		closeStreamReader();
 	}
 
 	public void cancelOnError(Throwable t) {
 		LOG.error("File tranfer canceled on error.", t);
 		resultFuture.cancel(true);
-		closeStreams();
+		closeStreamReader();
 	}
 
 	void sendNext() {
@@ -129,12 +129,12 @@ public abstract class StreamingFileSender<Q, S> {
 		communicateEndOfTransfer();
 		done.set(true);
 		LOG.debug("File Transfer done.");
-		closeStreams();
+		closeStreamReader();
 	}
 
 	abstract void communicateEndOfTransfer();
 
-	void closeStreams() {
+	void closeStreamReader() {
 		LOG.debug("Closing streams");
 		streamReader.close();
 	}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
index 880ac38..6ac54ef 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
@@ -149,10 +149,10 @@ class StreamExclusiveFileSenderTest {
 		}
 
 		@Test
-		void shouldCloseStreams() {
+		void shouldCloseStreamReader() {
 			fileSender.cancelOnTimeout();
 
-			verify(fileSender).closeStreams();
+			verify(fileSender).closeStreamReader();
 		}
 	}
 
@@ -186,10 +186,10 @@ class StreamExclusiveFileSenderTest {
 		}
 
 		@Test
-		void shouldCloseStreams() {
+		void shouldCloseStreamReader() {
 			fileSender.cancelOnError(error);
 
-			verify(fileSender).closeStreams();
+			verify(fileSender).closeStreamReader();
 		}
 	}
 }
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
index 47ddd15..f369aab 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -85,7 +85,7 @@ class StreamingFileSenderTest {
 		void shouldCloseStreams() {
 			fileSender.cancelOnTimeout();
 
-			verify(fileSender).closeStreams();
+			verify(fileSender).closeStreamReader();
 		}
 	}
 
@@ -112,7 +112,7 @@ class StreamingFileSenderTest {
 		void shouldCloseStreams() {
 			fileSender.cancelOnError(error);
 
-			verify(fileSender).closeStreams();
+			verify(fileSender).closeStreamReader();
 		}
 	}
 
@@ -187,7 +187,7 @@ class StreamingFileSenderTest {
 			void shouldCallCloseStreams() {
 				fileSender.sendNextChunk();
 
-				verify(fileSender).closeStreams();
+				verify(fileSender).closeStreamReader();
 			}
 		}
 	}
@@ -214,17 +214,17 @@ class StreamingFileSenderTest {
 		void shouldCloseStreams() {
 			fileSender.endTransfer();
 
-			verify(fileSender).closeStreams();
+			verify(fileSender).closeStreamReader();
 		}
 	}
 
 	@Nested
-	class TestCloseStreams {
+	class TestCloseStreamReader {
 
 		@Test
 		@SneakyThrows
 		void shouldCloseInputStream() {
-			fileSender.closeStreams();
+			fileSender.closeStreamReader();
 
 			verify(inputStream).close();
 		}
-- 
GitLab


From 83a363dd3b995a9b53b0017eb22e59014532879d Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 09:35:10 +0200
Subject: [PATCH 07/12] OZG-7573 OZG-7991 Remove comment

---
 .../de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
index 92f9170..8051e11 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
@@ -48,7 +48,6 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
 	public StreamExclusiveFileSender<Q, S> send() {
 		LOG.debug("Start sending File.");
 
-		// this responseObserver registers also onReadyHandler
 		var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext);
 		requestObserver = reqObserverBuilder.apply(responseObserver);
 
-- 
GitLab


From 36622779b59dd5df9e0a76e240ba35796d991afa Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 09:42:29 +0200
Subject: [PATCH 08/12] OZG-7573 OZG-7991 Verify calling endTransfer() in test

---
 .../common/binaryfile/StreamingFileSender.java      |  2 +-
 .../common/binaryfile/StreamingFileSenderTest.java  | 13 ++++++-------
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
index 9d0440e..dc2af67 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -125,7 +125,7 @@ public abstract class StreamingFileSender<Q, S> {
 		}
 	}
 
-	protected void endTransfer() {
+	void endTransfer() {
 		communicateEndOfTransfer();
 		done.set(true);
 		LOG.debug("File Transfer done.");
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
index f369aab..32bd933 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -125,7 +125,7 @@ class StreamingFileSenderTest {
 		}
 
 		@Test
-		void shouldCallSendMetaData() {
+		void shouldSendMetaData() {
 			fileSender.sendNext();
 
 			verify(fileSender).sendMetaData();
@@ -157,7 +157,7 @@ class StreamingFileSenderTest {
 			}
 
 			@Test
-			void shouldCallSendChunk() {
+			void shouldSendChunk() {
 				fileSender.sendNextChunk();
 
 				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
@@ -176,18 +176,17 @@ class StreamingFileSenderTest {
 			}
 
 			@Test
-			void shouldNotCallSendChunk() {
+			void shouldNotSendChunk() {
 				fileSender.sendNextChunk();
 
 				verify(fileSender, never()).sendChunk(any(), anyInt());
 			}
 
 			@Test
-			@SneakyThrows
-			void shouldCallCloseStreams() {
+			void shouldEndTransfer() {
 				fileSender.sendNextChunk();
 
-				verify(fileSender).closeStreamReader();
+				verify(fileSender).endTransfer();
 			}
 		}
 	}
@@ -211,7 +210,7 @@ class StreamingFileSenderTest {
 		}
 
 		@Test
-		void shouldCloseStreams() {
+		void shouldCloseStreamReader() {
 			fileSender.endTransfer();
 
 			verify(fileSender).closeStreamReader();
-- 
GitLab


From 7b7c478dfe4f9c2f6f4d06c1c6312aadf2b11951 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 09:43:40 +0200
Subject: [PATCH 09/12] OZG-7573 OZG-7991 Improve formatting

---
 .../de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
index 32bd933..ba58775 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -282,6 +282,7 @@ class StreamingFileSenderTest {
 		@Test
 		void shouldSendMetadataOnlyOnce() {
 			fileSender.withMetaData(metaData).sendMetaData();
+
 			fileSender.sendMetaData();
 
 			verify(requestObserver).onNext(metaData);
-- 
GitLab


From 8f8039e7c46b37400ee166f4b6f0ec3dce431447 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 10:28:44 +0200
Subject: [PATCH 10/12] OZG-7573 OZG-7991 Fix spelling

---
 .../de/ozgcloud/common/binaryfile/StreamingFileSender.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
index dc2af67..89661ba 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -86,7 +86,7 @@ public abstract class StreamingFileSender<Q, S> {
 
 	void sendNext() {
 		if (!done.get()) {
-			waitForOberver();
+			waitForObserver();
 			sendMetaData();
 			do {
 				LOG.debug("Sending next chunk.");
@@ -100,7 +100,7 @@ public abstract class StreamingFileSender<Q, S> {
 		return getRequestObserver().isReady();
 	}
 
-	private void waitForOberver() {
+	private void waitForObserver() {
 		synchronized (this) {
 			while (Objects.isNull(getRequestObserver())) {
 				try {
-- 
GitLab


From 8e5eefbde6274091c01e2c16ce2e48e2e8b716bc Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 15:13:21 +0200
Subject: [PATCH 11/12] OZG-7573 OZG-7991 Stop if cancelled

---
 .../binaryfile/StreamingFileSender.java       | 18 ++++-
 .../binaryfile/StreamingFileSenderTest.java   | 69 ++++++++++++++++++-
 2 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
index 89661ba..de78cf1 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> {
 	}
 
 	public void cancelOnError(Throwable t) {
-		LOG.error("File tranfer canceled on error.", t);
+		LOG.error("File transfer canceled on error.", t);
 		resultFuture.cancel(true);
 		closeStreamReader();
 	}
 
 	void sendNext() {
-		if (!done.get()) {
+		if (notFinished()) {
 			waitForObserver();
 			sendMetaData();
 			do {
 				LOG.debug("Sending next chunk.");
 				sendNextChunk();
-			} while (!done.get() && isReady());
+			} while (notFinished() && isReady());
 			LOG.debug("Finished or waiting to become ready.");
 		}
+		checkIfFinishedForcefully();
+	}
+
+	private boolean notFinished() {
+		return !done.get() && !resultFuture.isCancelled();
+	}
+
+	private void checkIfFinishedForcefully() {
+		if (resultFuture.isCancelled()) {
+			LOG.warn("File transfer was cancelled");
+			closeStreamReader();
+		}
 	}
 
 	private boolean isReady() {
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
index ba58775..ce12c67 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 
 import org.apache.commons.lang3.RandomUtils;
@@ -71,7 +72,7 @@ class StreamingFileSenderTest {
 
 		@BeforeEach
 		void init() {
-			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+			setResultFutureInFileSender(resultFuture);
 		}
 
 		@Test
@@ -98,7 +99,7 @@ class StreamingFileSenderTest {
 
 		@BeforeEach
 		void init() {
-			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+			setResultFutureInFileSender(resultFuture);
 		}
 
 		@Test
@@ -119,6 +120,9 @@ class StreamingFileSenderTest {
 	@Nested
 	class TestSendNext {
 
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+
 		@BeforeEach
 		void init() {
 			fileSender.send();
@@ -137,6 +141,59 @@ class StreamingFileSenderTest {
 
 			verify(fileSender).sendNextChunk();
 		}
+
+		@Test
+		void shouldNotSendMetaDataIfDone() {
+			setDoneInFileSender(true);
+
+			fileSender.sendNext();
+
+			verify(fileSender, never()).sendMetaData();
+		}
+
+		@Test
+		void shouldNotSendMetaDataIfCancelled() {
+			fileSender.getResultFuture().cancel(true);
+
+			fileSender.sendNext();
+
+			verify(fileSender, never()).sendMetaData();
+		}
+
+		@Test
+		void shouldSendNextChunkUntilDone() {
+			lenient().when(requestObserver.isReady()).thenReturn(true);
+			doAnswer(invocation -> {
+				setDoneInFileSender(true);
+				return null;
+			}).when(fileSender).sendNextChunk();
+
+			fileSender.sendNext();
+
+			verify(fileSender, times(1)).sendNextChunk();
+		}
+
+		@Test
+		void shouldSendNextChunkUntilCancelled() {
+			lenient().when(requestObserver.isReady()).thenReturn(true);
+			doAnswer(invocation -> {
+				fileSender.getResultFuture().cancel(true);
+				return null;
+			}).when(fileSender).sendNextChunk();
+
+			fileSender.sendNext();
+
+			verify(fileSender, times(1)).sendNextChunk();
+		}
+
+		@Test
+		void closeStreamReaderIfCancelled() {
+			fileSender.getResultFuture().cancel(true);
+
+			fileSender.sendNext();
+
+			verify(fileSender).closeStreamReader();
+		}
 	}
 
 	@Nested
@@ -289,6 +346,14 @@ class StreamingFileSenderTest {
 		}
 	}
 
+	private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
+		ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+	}
+
+	private void setDoneInFileSender(boolean done) {
+		((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
+	}
+
 	static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
 
 		@Getter(AccessLevel.PROTECTED)
-- 
GitLab


From bea6456cc191b87f7b2f9c466199fb2a1be779c6 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 2 Apr 2025 15:14:55 +0200
Subject: [PATCH 12/12] OZG-7573 OZG-7991 Fix sonarqube warnings

---
 .../ozgcloud/common/binaryfile/StreamExclusiveFileSender.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
index 8051e11..4223d4d 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
@@ -64,11 +64,13 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
 		return requestObserver;
 	}
 
+	@Override
 	public void cancelOnTimeout() {
 		super.cancelOnTimeout();
 		requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
 	}
 
+	@Override
 	public void cancelOnError(Throwable t) {
 		super.cancelOnError(t);
 		requestObserver.onError(t);
-- 
GitLab