diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
index 1cbd50d5e2368639f27448c8f61ba7a47740ecd8..b021ed18935637b737f9d473dea168af8b8b9ca3 100644
--- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -23,225 +23,69 @@
  */
 package de.ozgcloud.common.binaryfile;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.apache.commons.io.IOUtils;
-
-import de.ozgcloud.common.errorhandling.TechnicalException;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 
 @Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class GrpcFileUploadUtils {
 
-	static final int CHUNK_SIZE = 4 * 1024;
-
-	/*
-	 * Q = Request Type; S = Response Type
+	/**
+	 * @param <Q> Request Type
+	 * @param <S> Response Type
+	 * @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead
 	 */
+	@Deprecated(since = "4.13.0")
 	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
-		return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
+		return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
 	}
 
-	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
-			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
-		return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
+	/**
+	 * @param <Q> Request Type
+	 * @param <S> Response Type
+	 */
+	public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+		return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
 	}
 
-	public static class FileSender<Q, S> {
-		private final BiFunction<byte[], Integer, Q> chunkBuilder;
-		private final InputStream inputStream;
-
-		@Getter
-		private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
-		private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
-		private CallStreamObserver<Q> requestObserver;
-
-		private Optional<Q> metaData = Optional.empty();
-		private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
-		private final AtomicBoolean done = new AtomicBoolean(false);
-
-		private final StreamReader streamReader;
-		private final boolean completeOnFileSent;
-
-		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
-				InputStream inputStream) {
-			this(chunkBuilder, reqObserverBuilder, inputStream, true);
-		}
+	/**
+	 * @param <Q> Request Type
+	 * @param <S> Response Type
+	 */
+	public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
+		return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
+	}
 
-		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
-				InputStream inputStream, boolean completeOnFileSent) {
-			this.chunkBuilder = chunkBuilder;
-			this.inputStream = inputStream;
-			this.reqObserverBuilder = reqObserverBuilder;
-			this.completeOnFileSent = completeOnFileSent;
+	@Deprecated(since = "4.13.0")
+	public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> {
 
-			this.streamReader = new StreamReader(this.inputStream);
-		}
-
-		public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
-			this.metaData = Optional.of(metaData);
-			return this;
+		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+				Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+			super(chunkBuilder, inputStream, reqObserverBuilder);
 		}
 
+		@Override
 		public FileSender<Q, S> send() {
-			LOG.debug("Start sending File.");
-			var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
-			requestObserver = reqObserverBuilder.apply(responseObserver);
-
+			super.send();
 			return this;
 		}
 
-		public void cancelOnTimeout() {
-			LOG.warn("File transfer canceled on timeout");
-			resultFuture.cancel(true);
-			requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
-			closeStreams();
-		}
-
-		public void cancelOnError(Throwable t) {
-			LOG.error("File tranfer canceled on error.", t);
-			resultFuture.cancel(true);
-			requestObserver.onError(t);
-			closeStreams();
-		}
-
-		void sendNext() {
-			if (!done.get()) {
-				waitForOberver();
-				sendMetaData();
-				do {
-					LOG.debug("Sending next chunk.");
-					sendNextChunk();
-				} while (!done.get() && isReady());
-				LOG.debug("Finished or waiting to become ready.");
-			}
-		}
-
-		private boolean isReady() {
-			return requestObserver.isReady();
-		}
-
-		private void waitForOberver() {
-			synchronized (this) {
-				while (Objects.isNull(requestObserver)) {
-					try {
-						LOG.debug("wait for observer");
-						wait(300);
-					} catch (InterruptedException e) {
-						LOG.error("Error on waiting for request Observer.", e);
-						Thread.currentThread().interrupt();
-					}
-				}
-			}
-
-		}
-
-		void sendNextChunk() {
-			byte[] contentToSend = streamReader.getNextData();
-
-			if (streamReader.getLastReadSize() > 0) {
-				sendChunk(contentToSend, streamReader.getLastReadSize());
-			} else {
-				endTransfer();
-			}
-		}
-
-		private void endTransfer() {
-			if (completeOnFileSent)
-				requestObserver.onCompleted();
-			else
-				sendEndOfFile();
-			done.set(true);
-			LOG.debug("File Transfer done.");
-			closeStreams();
-
-		}
-
-		private void sendEndOfFile() {
-			sendChunk(new byte[0], streamReader.getLastReadSize());
-		}
-
-		void closeStreams() {
-			LOG.debug("Closing streams");
-			streamReader.close();
-		}
-
-		void sendChunk(byte[] content, int length) {
-			LOG.debug("Sending {} byte Data.", length);
-			var chunk = chunkBuilder.apply(content, length);
-			requestObserver.onNext(chunk);
-		}
-
-		byte[] readFromStream() {
-			try {
-				return inputStream.readNBytes(CHUNK_SIZE);
-			} catch (IOException e) {
-				throw new TechnicalException("Error on sending a single chunk", e);
-			}
-		}
-
-		void sendMetaData() {
-			metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
-		}
-
-		private void doSendMetaData(Q metadata) {
-			LOG.debug("Sending Metadata.");
-			requestObserver.onNext(metadata);
-			metaDataSent.set(true);
-		}
-
-		void checkForEndOfStream(long sentSize) {
-			if (sentSize < CHUNK_SIZE) {
-				LOG.debug("File Transfer done. Closing stream.");
-				IOUtils.closeQuietly(inputStream);
-				requestObserver.onCompleted();
-				done.set(true);
-			} else {
-				LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
-			}
-		}
-
-		@RequiredArgsConstructor
-		private class StreamReader {
-			private final InputStream inStream;
-			private final byte[] buffer = new byte[CHUNK_SIZE];
-			@Getter
-			private int lastReadSize = 0;
-			@Getter
-			private final AtomicBoolean done = new AtomicBoolean(false);
-
-			byte[] getNextData() {
-				readNext();
-				return buffer;
-			}
-
-			void close() {
-				IOUtils.closeQuietly(inStream);
-			}
-
-			void readNext() {
-				try {
-					lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
-				} catch (IOException e) {
-					throw new TechnicalException("Error on reading a single chunk", e);
-				}
-			}
+		@Override
+		public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
+			super.withMetaData(metaData);
+			return this;
 		}
 	}
 
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
new file mode 100644
index 0000000000000000000000000000000000000000..4223d4de1b383d657d105d305d11e2d10b35ba54
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.InputStream;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
+
+	private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
+	private CallStreamObserver<Q> requestObserver;
+
+	StreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+		super(chunkBuilder, inputStream);
+
+		this.reqObserverBuilder = reqObserverBuilder;
+	}
+
+	@Override
+	public StreamExclusiveFileSender<Q, S> send() {
+		LOG.debug("Start sending File.");
+
+		var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext);
+		requestObserver = reqObserverBuilder.apply(responseObserver);
+
+		return this;
+	}
+
+	@Override
+	void communicateEndOfTransfer() {
+		requestObserver.onCompleted();
+	}
+
+	@Override
+	protected CallStreamObserver<Q> getRequestObserver() {
+		return requestObserver;
+	}
+
+	@Override
+	public void cancelOnTimeout() {
+		super.cancelOnTimeout();
+		requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
+	}
+
+	@Override
+	public void cancelOnError(Throwable t) {
+		super.cancelOnError(t);
+		requestObserver.onError(t);
+	}
+}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java
new file mode 100644
index 0000000000000000000000000000000000000000..c5c993a0f14881912f39c3c069912b4027eb81d7
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.InputStream;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+import io.grpc.stub.CallStreamObserver;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+class StreamSharingFileSender<Q, S> extends StreamingFileSender<Q, S> {
+
+	private final CallStreamObserver<Q> requestObserver;
+	private final Consumer<Runnable> onReadyHandlerRegistrar;
+
+	StreamSharingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
+		super(chunkBuilder, inputStream);
+
+		this.requestObserver = requestObserver;
+		this.onReadyHandlerRegistrar = onReadyHandlerRegistrar;
+	}
+
+	@Override
+	public StreamSharingFileSender<Q, S> send() {
+		LOG.debug("Register onReadyHandler and start sending File.");
+		onReadyHandlerRegistrar.accept(this::sendNext);
+		return this;
+	}
+
+	@Override
+	void communicateEndOfTransfer() {
+		sendEndOfFile();
+		getResultFuture().complete(null);
+	}
+
+	private void sendEndOfFile() {
+		sendChunk(new byte[0], -1);
+	}
+
+	@Override
+	protected CallStreamObserver<Q> getRequestObserver() {
+		return requestObserver;
+	}
+}
diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
new file mode 100644
index 0000000000000000000000000000000000000000..de78cf14b005300683dd1d63ed5c7cf4184da010
--- /dev/null
+++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+
+import org.apache.commons.io.IOUtils;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+/*
+ * Q = Request Type; S = Response Type
+ */
+@Log4j2
+public abstract class StreamingFileSender<Q, S> {
+
+	static final int CHUNK_SIZE = 4 * 1024;
+	
+	private final BiFunction<byte[], Integer, Q> chunkBuilder;
+
+	@Getter
+	private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+
+	private Q metaData;
+	private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
+	private final AtomicBoolean done = new AtomicBoolean(false);
+
+	@Getter(AccessLevel.PROTECTED)
+	private final StreamReader streamReader;
+
+	StreamingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream) {
+		this.chunkBuilder = chunkBuilder;
+		this.streamReader = new StreamReader(inputStream);
+	}
+
+	public StreamingFileSender<Q, S> withMetaData(@NonNull Q metaData) {
+		this.metaData = metaData;
+		return this;
+	}
+
+	public abstract StreamingFileSender<Q, S> send();
+
+	public void cancelOnTimeout() {
+		LOG.warn("File transfer canceled on timeout");
+		resultFuture.cancel(true);
+		closeStreamReader();
+	}
+
+	public void cancelOnError(Throwable t) {
+		LOG.error("File transfer canceled on error.", t);
+		resultFuture.cancel(true);
+		closeStreamReader();
+	}
+
+	void sendNext() {
+		if (notFinished()) {
+			waitForObserver();
+			sendMetaData();
+			do {
+				LOG.debug("Sending next chunk.");
+				sendNextChunk();
+			} while (notFinished() && isReady());
+			LOG.debug("Finished or waiting to become ready.");
+		}
+		checkIfFinishedForcefully();
+	}
+
+	private boolean notFinished() {
+		return !done.get() && !resultFuture.isCancelled();
+	}
+
+	private void checkIfFinishedForcefully() {
+		if (resultFuture.isCancelled()) {
+			LOG.warn("File transfer was cancelled");
+			closeStreamReader();
+		}
+	}
+
+	private boolean isReady() {
+		return getRequestObserver().isReady();
+	}
+
+	private void waitForObserver() {
+		synchronized (this) {
+			while (Objects.isNull(getRequestObserver())) {
+				try {
+					LOG.debug("wait for observer");
+					wait(300);
+				} catch (InterruptedException e) {
+					LOG.error("Error on waiting for request Observer.", e);
+					Thread.currentThread().interrupt();
+				}
+			}
+		}
+
+	}
+
+	void sendNextChunk() {
+		byte[] contentToSend = streamReader.getNextData();
+
+		if (streamReader.getLastReadSize() > 0) {
+			sendChunk(contentToSend, streamReader.getLastReadSize());
+		} else {
+			endTransfer();
+		}
+	}
+
+	void endTransfer() {
+		communicateEndOfTransfer();
+		done.set(true);
+		LOG.debug("File Transfer done.");
+		closeStreamReader();
+	}
+
+	abstract void communicateEndOfTransfer();
+
+	void closeStreamReader() {
+		LOG.debug("Closing streams");
+		streamReader.close();
+	}
+
+	void sendChunk(byte[] content, int length) {
+		LOG.debug("Sending {} byte Data.", length);
+		var chunk = chunkBuilder.apply(content, length);
+		getRequestObserver().onNext(chunk);
+	}
+
+	void sendMetaData() {
+		if (metaData != null && !metaDataSent.get()) {
+			doSendMetaData(metaData);
+		}
+	}
+
+	private void doSendMetaData(Q metadata) {
+		LOG.debug("Sending Metadata.");
+		getRequestObserver().onNext(metadata);
+		metaDataSent.set(true);
+	}
+
+	protected abstract CallStreamObserver<Q> getRequestObserver();
+
+	@RequiredArgsConstructor
+	protected static class StreamReader {
+		private final InputStream inStream;
+		private final byte[] buffer = new byte[CHUNK_SIZE];
+		@Getter
+		private int lastReadSize = 0;
+		@Getter
+		private final AtomicBoolean done = new AtomicBoolean(false);
+
+		byte[] getNextData() {
+			readNext();
+			return buffer;
+		}
+
+		void close() {
+			IOUtils.closeQuietly(inStream);
+		}
+
+		void readNext() {
+			try {
+				lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
+			} catch (IOException e) {
+				throw new TechnicalException("Error on reading a single chunk", e);
+			}
+		}
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
index fb77f3ca04c0554238b03cff1f996028d4ae29f6..c94b69ec5df2eec920e0797589706ace0f09e6cb 100644
--- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java
@@ -24,289 +24,64 @@
 package de.ozgcloud.common.binaryfile;
 
 import static org.assertj.core.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.apache.commons.lang3.RandomUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
 import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
-import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
-import de.ozgcloud.common.errorhandling.TechnicalException;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
-import lombok.SneakyThrows;
 
 class GrpcFileUploadUtilsTest {
 
-	@InjectMocks
-	private GrpcFileUploadUtils service;
-
 	@Mock
 	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
 	@Mock
+	private InputStream inputStream;
+	@Mock
 	private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
 	@Mock
 	private CallStreamObserver<TestRequestType> requestObserver;
 	@Mock
-	private InputStream inputStream;
-
-	private FileSender<TestRequestType, TestResponseType> fileSender;
-
-	@Mock
-	private TestRequestType metaData;
-
-	@BeforeEach
-	void init() {
-		when(reqObserverBuilder.apply(any())).thenReturn(requestObserver);
-		fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder));
-	}
+	private Consumer<Runnable> onReadyHandlerRegistrar;
 
 	@Nested
-	class TestCreateFileSender {
+	class TestCreateSender {
 
 		@Test
-		void shouldCreateRequestObserver() {
-			GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send();
+		void shouldReturnInstanceOfStreamExclusiveFileSender() {
+			var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
 
-			verify(reqObserverBuilder, atLeastOnce()).apply(notNull());
+			assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
 		}
 	}
 
 	@Nested
-	class TestSendBinaryFile {
-
-		@Captor
-		private ArgumentCaptor<Runnable> runnableCaptor;
+	class TestCreateStreamExclusiveFileSender {
 
 		@Test
-		void shouldReturnSenderWithFuture() {
-			var result = fileSender.send();
+		void shouldReturnInstanceOfStreamExclusiveFileSender() {
+			var createdSender = GrpcFileUploadUtils.createStreamExclusiveFileSender(chunkBuilder, inputStream, reqObserverBuilder);
 
-			assertThat(result).isNotNull().extracting(FileSender::getResultFuture).isNotNull();
+			assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
 		}
 	}
 
 	@Nested
-	class TestSendNext {
-
-		@BeforeEach
-		void initObserver() {
-			fileSender.send();
-		}
+	class TestCreateStreamSharingSender {
 
 		@Test
-		void shouldCallSendMetaData() {
-			fileSender.sendNext();
-
-			verify(fileSender).sendMetaData();
-		}
-
-		@Test
-		void shouldSendNextChunk() {
-			fileSender.sendNext();
-
-			verify(fileSender).sendNextChunk();
-		}
-
-	}
-
-	@Nested
-	class TestSendNextChunk {
-
-		private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
-		private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
+		void shouldReturnInstanceOfStreamSharingSender() {
+			var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
 
-		@Captor
-		private ArgumentCaptor<byte[]> chunkCaptor;
-
-		@Nested
-		class TestOnDataAvailable {
-			@BeforeEach
-			void initObserver() {
-				fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
-				fileSender.send();
-			}
-
-			@Test
-			void shouldCallSendChunk() {
-				fileSender.sendNextChunk();
-
-				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
-				assertThat(chunkCaptor.getValue()).contains(content);
-			}
+			assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class);
 		}
-
-		@Nested
-		class TestOnNoBytesLeftToRead {
-
-			@Nested
-			class TestOnCompleteOnFileSent {
-				private static final boolean COMPLETE_ON_FILE_SENT = true;
-
-				@BeforeEach
-				void initialize() {
-					var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
-					byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
-					fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
-					fileSender.send();
-				}
-
-				@Test
-				void shouldCallOnCompleted() {
-					fileSender.sendNextChunk();
-
-					verify(requestObserver).onCompleted();
-				}
-
-				@Test
-				void shouldNotCallSendChunk() {
-					fileSender.sendNextChunk();
-
-					verify(fileSender, never()).sendChunk(any(), anyInt());
-				}
-
-				@Test
-				@SneakyThrows
-				void shouldCallCloseStreams() {
-					fileSender.sendNextChunk();
-
-					verify(fileSender).closeStreams();
-				}
-			}
-
-			@Nested
-			class TestOnNotCompleteOnFileSent {
-				private static final boolean COMPLETE_ON_FILE_SENT = false;
-
-				@BeforeEach
-				void initialize() {
-					var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
-					byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
-					fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
-					fileSender.send();
-				}
-
-				@Test
-				void shouldNotCallOnCompleted() {
-					fileSender.sendNextChunk();
-
-					verify(requestObserver, never()).onCompleted();
-				}
-
-				@Test
-				void shouldCallSendChunk() {
-					fileSender.sendNextChunk();
-
-					verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
-					assertThat(chunkCaptor.getValue()).isEmpty();
-				}
-
-				@Test
-				@SneakyThrows
-				void shouldCallCloseStreams() {
-					fileSender.sendNextChunk();
-
-					verify(fileSender).closeStreams();
-				}
-			}
-		}
-
 	}
-
-	@Nested
-	class TestCloseStreams {
-
-		@Test
-		@SneakyThrows
-		void shouldCloseInputStream() {
-			fileSender.send();
-
-			fileSender.closeStreams();
-
-			verify(inputStream).close();
-		}
-	}
-
-	@Nested
-	class TestSendChunk {
-
-		private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
-
-		@BeforeEach
-		void initObserver() {
-			fileSender.send();
-		}
-
-		@Test
-		void shouldApplyBuildChunk() throws IOException {
-			fileSender.sendChunk(CHUNK_PART, 5);
-
-			verify(chunkBuilder).apply(CHUNK_PART, 5);
-		}
-
-		@Test
-		void shouldCallOnNext() throws IOException {
-			fileSender.sendChunk(CHUNK_PART, 5);
-
-			verify(requestObserver).onNext(any());
-		}
-	}
-
-	@Nested
-	class TestSendMetaData {
-
-		@BeforeEach
-		void initObserver() {
-			fileSender.send();
-		}
-
-		@Test
-		void shouldNotSendWithoutMetadata() {
-			fileSender.sendMetaData();
-
-			verify(requestObserver, never()).onNext(any());
-		}
-
-		@Test
-		void shouldSendMetadata() {
-			fileSender.withMetaData(metaData).sendMetaData();
-
-			verify(requestObserver).onNext(metaData);
-		}
-
-		@Test
-		void shouldSendMetadataOnlyOnce() {
-			fileSender.withMetaData(metaData).sendMetaData();
-			fileSender.sendMetaData();
-
-			verify(requestObserver).onNext(metaData);
-		}
-	}
-
-	@Disabled("unused")
-	@Nested
-	class TestReadFromStream {
-		@Test
-		void shouldThrowException() throws IOException {
-			doThrow(IOException.class).when(inputStream).read(any(), anyInt(), anyInt());
-
-			assertThatThrownBy(() -> fileSender.readFromStream()).isInstanceOf(TechnicalException.class);
-		}
-	}
-
 }
\ No newline at end of file
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..6ac54ef835b35e685a9b67b89fdba6fc5ed482a0
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.mockito.Mockito.*;
+
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Spy;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+
+class StreamExclusiveFileSenderTest {
+
+	@Mock
+	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
+	@Mock
+	private InputStream inputStream;
+	@Mock
+	private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
+	@Spy
+	@InjectMocks
+	private StreamExclusiveFileSender<TestRequestType, TestResponseType> fileSender;
+
+	@Nested
+	class TestSend {
+
+		private final CompletableFuture<TestResponseType> resultFuture = CompletableFuture.completedFuture(new TestResponseType());
+		@Captor
+		private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
+		@Mock
+		private BinaryFileUploadStreamObserver<TestRequestType, TestResponseType> responseObserver;
+
+		@BeforeEach
+		void init() {
+			doReturn(resultFuture).when(fileSender).getResultFuture();
+		}
+
+		@SuppressWarnings("rawtypes")
+		@Test
+		void shouldCreateResponseObserver() {
+			try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
+				doNothing().when(fileSender).sendNext();
+
+				fileSender.send();
+
+				mocked.verify(() -> BinaryFileUploadStreamObserver.create(same(resultFuture), onReadyHandlerCaptor.capture()));
+				verifyCallsSendNext(onReadyHandlerCaptor.getValue());
+			}
+		}
+
+		@SuppressWarnings("rawtypes")
+		@Test
+		void shouldBuildRequestObserver() {
+			try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
+				mocked.when(() -> BinaryFileUploadStreamObserver.create(any(), any())).thenReturn(responseObserver);
+
+				fileSender.send();
+
+				verify(reqObserverBuilder).apply(responseObserver);
+			}
+		}
+
+		private void verifyCallsSendNext(Runnable runnable) {
+			runnable.run();
+			verify(fileSender).sendNext();
+		}
+	}
+
+	@Nested
+	class TestCommunicateEndOfTransfer {
+
+		@Mock
+		private CallStreamObserver<TestRequestType> requestObserver;
+
+		@Test
+		void shouldCallOnCompleted() {
+			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
+
+			fileSender.communicateEndOfTransfer();
+
+			verify(requestObserver).onCompleted();
+		}
+	}
+
+	@Nested
+	class TestCancelOnTimeout {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+		@Mock
+		private CallStreamObserver<TestRequestType> requestObserver;
+
+		@BeforeEach
+		void init() {
+			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnTimeout();
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCallOnError() {
+			fileSender.cancelOnTimeout();
+
+			verify(requestObserver).onError(any(TechnicalException.class));
+		}
+
+		@Test
+		void shouldCloseStreamReader() {
+			fileSender.cancelOnTimeout();
+
+			verify(fileSender).closeStreamReader();
+		}
+	}
+
+	@Nested
+	class TestCancelOnError {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+		@Mock
+		private CallStreamObserver<TestRequestType> requestObserver;
+		private final Throwable error = new Throwable();
+
+		@BeforeEach
+		void init() {
+			ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+			ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnError(error);
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCallOnError() {
+			fileSender.cancelOnError(error);
+
+			verify(requestObserver).onError(error);
+		}
+
+		@Test
+		void shouldCloseStreamReader() {
+			fileSender.cancelOnError(error);
+
+			verify(fileSender).closeStreamReader();
+		}
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..6c2225c9e742c6fa41e8390f386dc7f4bbad7bd7
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.InputStream;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
+import io.grpc.stub.CallStreamObserver;
+
+class StreamSharingFileSenderTest {
+
+	@Mock
+	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
+	@Mock
+	private InputStream inputStream;
+	@Mock
+	private CallStreamObserver<TestRequestType> requestObserver;
+	@Mock
+	private Consumer<Runnable> onReadyHandlerRegistrar;
+	@Spy
+	@InjectMocks
+	private StreamSharingFileSender<TestRequestType, TestResponseType> fileSender;
+
+	@Nested
+	class TestSend {
+
+		@Captor
+		private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
+
+		@Test
+		void shouldRegisterOnReadyHandler() {
+			doNothing().when(fileSender).sendNext();
+
+			fileSender.send();
+
+			verify(onReadyHandlerRegistrar).accept(onReadyHandlerCaptor.capture());
+			verifyCallsSendNext(onReadyHandlerCaptor.getValue());
+		}
+
+		@Test
+		void shouldReturnThis() {
+			var obj = fileSender.send();
+
+			assertThat(obj).isSameAs(fileSender);
+		}
+
+		private void verifyCallsSendNext(Runnable runnable) {
+			runnable.run();
+			verify(fileSender).sendNext();
+		}
+	}
+
+	@Nested
+	class TestCommunicateEndOfTransfer {
+
+		@Test
+		void shouldSendEmptyChunk() {
+			fileSender.communicateEndOfTransfer();
+
+			verify(fileSender).sendChunk(new byte[0], -1);
+		}
+
+		@Test
+		void shouldCompleteResultFuture() {
+			fileSender.communicateEndOfTransfer();
+
+			assertThat(fileSender.getResultFuture().isDone()).isTrue();
+		}
+	}
+}
diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..ce12c67d1ee3d157c1385d00f5bd5193382ac9ae
--- /dev/null
+++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java
@@ -0,0 +1,377 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
+import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
+import io.grpc.stub.CallStreamObserver;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.SneakyThrows;
+
+class StreamingFileSenderTest {
+
+	@Mock
+	private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
+	@Mock
+	private InputStream inputStream;
+	@Mock
+	private CallStreamObserver<TestRequestType> requestObserver;
+
+	private TestFileSender fileSender;
+
+	@BeforeEach
+	void init() {
+		fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver));
+	}
+
+	@Nested
+	class TestCancelOnTimeout {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+
+		@BeforeEach
+		void init() {
+			setResultFutureInFileSender(resultFuture);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnTimeout();
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.cancelOnTimeout();
+
+			verify(fileSender).closeStreamReader();
+		}
+	}
+
+	@Nested
+	class TestCancelOnError {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+		private final Throwable error = new Throwable();
+
+		@BeforeEach
+		void init() {
+			setResultFutureInFileSender(resultFuture);
+		}
+
+		@Test
+		void shouldCancelResultFuture() {
+			fileSender.cancelOnError(error);
+
+			verify(resultFuture).cancel(true);
+		}
+
+		@Test
+		void shouldCloseStreams() {
+			fileSender.cancelOnError(error);
+
+			verify(fileSender).closeStreamReader();
+		}
+	}
+
+	@Nested
+	class TestSendNext {
+
+		@Mock
+		private CompletableFuture<TestResponseType> resultFuture;
+
+		@BeforeEach
+		void init() {
+			fileSender.send();
+		}
+
+		@Test
+		void shouldSendMetaData() {
+			fileSender.sendNext();
+
+			verify(fileSender).sendMetaData();
+		}
+
+		@Test
+		void shouldSendNextChunk() {
+			fileSender.sendNext();
+
+			verify(fileSender).sendNextChunk();
+		}
+
+		@Test
+		void shouldNotSendMetaDataIfDone() {
+			setDoneInFileSender(true);
+
+			fileSender.sendNext();
+
+			verify(fileSender, never()).sendMetaData();
+		}
+
+		@Test
+		void shouldNotSendMetaDataIfCancelled() {
+			fileSender.getResultFuture().cancel(true);
+
+			fileSender.sendNext();
+
+			verify(fileSender, never()).sendMetaData();
+		}
+
+		@Test
+		void shouldSendNextChunkUntilDone() {
+			lenient().when(requestObserver.isReady()).thenReturn(true);
+			doAnswer(invocation -> {
+				setDoneInFileSender(true);
+				return null;
+			}).when(fileSender).sendNextChunk();
+
+			fileSender.sendNext();
+
+			verify(fileSender, times(1)).sendNextChunk();
+		}
+
+		@Test
+		void shouldSendNextChunkUntilCancelled() {
+			lenient().when(requestObserver.isReady()).thenReturn(true);
+			doAnswer(invocation -> {
+				fileSender.getResultFuture().cancel(true);
+				return null;
+			}).when(fileSender).sendNextChunk();
+
+			fileSender.sendNext();
+
+			verify(fileSender, times(1)).sendNextChunk();
+		}
+
+		@Test
+		void closeStreamReaderIfCancelled() {
+			fileSender.getResultFuture().cancel(true);
+
+			fileSender.sendNext();
+
+			verify(fileSender).closeStreamReader();
+		}
+	}
+
+	@Nested
+	class TestSendNextChunk {
+
+		private final byte[] content = RandomUtils.insecure().randomBytes(StreamingFileSender.CHUNK_SIZE / 2);
+		private final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
+
+		@Captor
+		private ArgumentCaptor<byte[]> chunkCaptor;
+
+		@Nested
+		class TestOnDataAvailable {
+
+			@BeforeEach
+			void init() {
+				fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
+			}
+
+			@Test
+			void shouldSendChunk() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
+				assertThat(chunkCaptor.getValue()).contains(content);
+			}
+		}
+
+		@Nested
+		class TestOnNoBytesLeftToRead {
+
+			@BeforeEach
+			void init() {
+				var buffer = new byte[StreamingFileSender.CHUNK_SIZE];
+				byteArrayInputStream.read(buffer, 0, StreamingFileSender.CHUNK_SIZE);
+				fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
+			}
+
+			@Test
+			void shouldNotSendChunk() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender, never()).sendChunk(any(), anyInt());
+			}
+
+			@Test
+			void shouldEndTransfer() {
+				fileSender.sendNextChunk();
+
+				verify(fileSender).endTransfer();
+			}
+		}
+	}
+
+	@Nested
+	class TestEndTransfer {
+
+		@Test
+		void shouldCommunicateEndOfTransfer() {
+			fileSender.endTransfer();
+
+			verify(fileSender).communicateEndOfTransfer();
+		}
+
+		@Test
+		void shouldSetDoneToTrue() {
+			fileSender.endTransfer();
+
+			fileSender.sendNext();
+			verify(fileSender, never()).sendNextChunk();
+		}
+
+		@Test
+		void shouldCloseStreamReader() {
+			fileSender.endTransfer();
+
+			verify(fileSender).closeStreamReader();
+		}
+	}
+
+	@Nested
+	class TestCloseStreamReader {
+
+		@Test
+		@SneakyThrows
+		void shouldCloseInputStream() {
+			fileSender.closeStreamReader();
+
+			verify(inputStream).close();
+		}
+	}
+
+	@Nested
+	class TestSendChunk {
+
+		private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
+
+		@BeforeEach
+		void initObserver() {
+			fileSender.send();
+		}
+
+		@Test
+		void shouldApplyBuildChunk() {
+			fileSender.sendChunk(CHUNK_PART, 5);
+
+			verify(chunkBuilder).apply(CHUNK_PART, 5);
+		}
+
+		@Test
+		void shouldCallOnNext() {
+			fileSender.sendChunk(CHUNK_PART, 5);
+
+			verify(requestObserver).onNext(any());
+		}
+	}
+
+	@Nested
+	class TestSendMetaData {
+
+		@Mock
+		private TestRequestType metaData;
+
+		@BeforeEach
+		void initObserver() {
+			fileSender.send();
+		}
+
+		@Test
+		void shouldNotSendWithoutMetadata() {
+			fileSender.sendMetaData();
+
+			verify(requestObserver, never()).onNext(any());
+		}
+
+		@Test
+		void shouldSendMetadata() {
+			fileSender.withMetaData(metaData).sendMetaData();
+
+			verify(requestObserver).onNext(metaData);
+		}
+
+		@Test
+		void shouldSendMetadataOnlyOnce() {
+			fileSender.withMetaData(metaData).sendMetaData();
+
+			fileSender.sendMetaData();
+
+			verify(requestObserver).onNext(metaData);
+		}
+	}
+
+	private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
+		ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
+	}
+
+	private void setDoneInFileSender(boolean done) {
+		((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
+	}
+
+	static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
+
+		@Getter(AccessLevel.PROTECTED)
+		private final CallStreamObserver<TestRequestType> requestObserver;
+
+		TestFileSender(BiFunction<byte[], Integer, TestRequestType> chunkBuilder, InputStream inputStream,
+				CallStreamObserver<TestRequestType> requestObserver) {
+			super(chunkBuilder, inputStream);
+			this.requestObserver = requestObserver;
+		}
+
+		@Override
+		public StreamingFileSender<TestRequestType, TestResponseType> send() {
+			return this;
+		}
+
+		@Override
+		void communicateEndOfTransfer() {
+		}
+	}
+}