From a6b106c90cea7979565cfd965e2da84fe5c1ddfc Mon Sep 17 00:00:00 2001
From: OZGCloud <ozgcloud@mgm-tp.com>
Date: Mon, 13 Nov 2023 08:25:55 +0100
Subject: [PATCH] allow cancelation grpc file transfer on error/timeout

---
 .../BinaryFileUploadStreamObserver.java         |  3 +++
 .../common/binaryfile/GrpcFileUploadUtils.java  | 17 +++++++++++++----
 .../binaryfile/GrpcFileUploadUtilsTest.java     |  4 ++--
 3 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/BinaryFileUploadStreamObserver.java b/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/BinaryFileUploadStreamObserver.java
index 26a62a2..ae9c6ad 100644
--- a/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/BinaryFileUploadStreamObserver.java
+++ b/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/BinaryFileUploadStreamObserver.java
@@ -30,7 +30,9 @@ import io.grpc.stub.ClientResponseObserver;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
 
+@Log4j2
 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
 public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseObserver<ReqT, R> {
 
@@ -61,6 +63,7 @@ public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseOb
 
 	@Override
 	public void onCompleted() {
+		LOG.debug("Complete future...");
 		future.complete(response);
 	}
 
diff --git a/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java b/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java
index 846b2a0..66798e2 100644
--- a/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java
+++ b/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java
@@ -62,6 +62,7 @@ public class GrpcFileUploadUtils {
 		private final BiFunction<byte[], Integer, Q> chunkBuilder;
 		private final InputStream inputStream;
 
+		@Getter
 		private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
 		private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
 		private CallStreamObserver<Q> requestObserver;
@@ -86,12 +87,22 @@ public class GrpcFileUploadUtils {
 			return this;
 		}
 
-		public CompletableFuture<S> send() {
+		public FileSender<Q, S> send() {
 			LOG.debug("Start sending File.");
 			var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
 			requestObserver = reqObserverBuilder.apply(responseObserver);
 
-			return resultFuture;
+			return this;
+		}
+
+		public void cancelOnTimeout() {
+			resultFuture.cancel(true);
+			requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
+		}
+
+		public void cancelOnError(Throwable t) {
+			resultFuture.cancel(true);
+			requestObserver.onError(t);
 		}
 
 		void sendNext() {
@@ -100,9 +111,7 @@ public class GrpcFileUploadUtils {
 				sendMetaData();
 				do {
 					LOG.debug("Sending next chunk.");
-//					long sentSize = 
 					sendNextChunk();
-//					checkForEndOfStream(sentSize);
 				} while (!done.get() && isReady());
 				LOG.debug("Finished or waiting to become ready.");
 			}
diff --git a/kop-common-lib/src/test/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtilsTest.java b/kop-common-lib/src/test/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtilsTest.java
index 31ed08c..2beb26a 100644
--- a/kop-common-lib/src/test/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtilsTest.java
+++ b/kop-common-lib/src/test/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtilsTest.java
@@ -49,7 +49,6 @@ import de.itvsh.kop.common.errorhandling.TechnicalException;
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
 
-@Disabled // FIXME
 class GrpcFileUploadUtilsTest {
 
 	@Spy
@@ -80,7 +79,7 @@ class GrpcFileUploadUtilsTest {
 
 		@Test
 		void shouldCreateRequestObserver() {
-			GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
+			GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send();
 
 			verify(reqObserverBuilder, atLeastOnce()).apply(notNull());
 		}
@@ -237,6 +236,7 @@ class GrpcFileUploadUtilsTest {
 		}
 	}
 
+	@Disabled("unused")
 	@Nested
 	class TestReadFromStream {
 		@Test
-- 
GitLab