From adfe531b216b2fb9edf5a4be1bd6ae80838130f6 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Fri, 28 Mar 2025 11:09:47 +0100
Subject: [PATCH] OZG-7573 OZG-7991 Copy GrpcFileUploadUtils from common-lib
 (only temporarily)

---
 .../BinaryFileUploadStreamObserver.java       |  81 ++++++
 .../binaryfile/GrpcFileUploadUtils.java       | 257 ++++++++++++++++++
 2 files changed, 338 insertions(+)
 create mode 100644 vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java
 create mode 100644 vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java

diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java b/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java
new file mode 100644
index 000000000..44635a997
--- /dev/null
+++ b/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseObserver<ReqT, R> {
+
+	private final CompletableFuture<R> future;
+	private Runnable onReadyHandler;
+
+	public static <ReqT, R> BinaryFileUploadStreamObserver<ReqT, R> create(CompletableFuture<R> future, Runnable onReadyHandler) {
+		BinaryFileUploadStreamObserver<ReqT, R> instance = create(future);
+		instance.onReadyHandler = onReadyHandler;
+		return instance;
+	}
+
+	public static <ReqT, R> BinaryFileUploadStreamObserver<ReqT, R> create(CompletableFuture<R> future) {
+		return new BinaryFileUploadStreamObserver<>(future);
+	}
+
+	@Getter
+	private R response;
+
+	/*
+	requestStream is CallStreamObserver - received from Grpc-framework. onReadyHandler calls onNext on this observer
+	 */
+	@Override
+	public void beforeStart(ClientCallStreamObserver<ReqT> requestStreamObserver) {
+		requestStreamObserver.setOnReadyHandler(onReadyHandler);
+	}
+
+	@Override
+	public void onNext(R response) {
+		this.response = response;
+	}
+
+	@Override
+	public void onError(Throwable t) {
+		LOG.error("Error on uploading file. Completing Future.", t);
+		future.completeExceptionally(t);
+	}
+
+	// will it even get called? requestStreamObserver.onCompleted() would need to be called first
+	@Override
+	public void onCompleted() {
+		LOG.debug("Complete future...");
+		future.complete(response);
+	}
+
+}
\ No newline at end of file
diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
new file mode 100644
index 000000000..56b96476d
--- /dev/null
+++ b/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.common.binaryfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.commons.io.IOUtils;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class GrpcFileUploadUtils {
+
+	static final int CHUNK_SIZE = 4 * 1024;
+
+	/*
+	 * Q = Request Type; S = Response Type
+	 */
+	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
+		return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
+	}
+
+	public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
+			Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
+		return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
+	}
+
+	public static class FileSender<Q, S> {
+		private final BiFunction<byte[], Integer, Q> chunkBuilder;
+		private final InputStream inputStream;
+
+		@Getter
+		private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
+		private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
+		private CallStreamObserver<Q> requestObserver;
+
+		private Optional<Q> metaData = Optional.empty();
+		private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
+		private final AtomicBoolean done = new AtomicBoolean(false);
+
+		private final StreamReader streamReader;
+		private final boolean completeOnFileSent;
+
+		FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
+				InputStream inputStream, boolean completeOnFileSent) {
+			this.chunkBuilder = chunkBuilder;
+			this.inputStream = inputStream;
+			this.reqObserverBuilder = reqObserverBuilder;
+			this.completeOnFileSent = completeOnFileSent;
+
+			this.streamReader = new StreamReader(this.inputStream);
+		}
+
+		public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
+			this.metaData = Optional.of(metaData);
+			return this;
+		}
+
+		public FileSender<Q, S> send(Consumer<Runnable> registerOnReadyHandler) {
+			LOG.debug("Start sending File.");
+
+			registerOnReadyHandler.accept(this::sendNext);
+			requestObserver = reqObserverBuilder.apply(null);
+
+			return this;
+		}
+
+		public FileSender<Q, S> send() {
+			LOG.debug("Start sending File.");
+
+			// this responseObserver registers also onReadyHandler
+			var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
+			requestObserver = reqObserverBuilder.apply(responseObserver);
+
+			return this;
+		}
+
+		public void cancelOnTimeout() {
+			LOG.warn("File transfer canceled on timeout");
+			resultFuture.cancel(true);
+			requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
+			closeStreams();
+		}
+
+		public void cancelOnError(Throwable t) {
+			LOG.error("File tranfer canceled on error.", t);
+			resultFuture.cancel(true);
+			requestObserver.onError(t);
+			closeStreams();
+		}
+
+		void sendNext() {
+			if (!done.get()) {
+				waitForOberver();
+				sendMetaData();
+				do {
+					LOG.debug("Sending next chunk.");
+					sendNextChunk();
+				} while (!done.get() && isReady());
+				LOG.debug("Finished or waiting to become ready.");
+			}
+		}
+
+		private boolean isReady() {
+			return requestObserver.isReady();
+		}
+
+		private void waitForOberver() {
+			synchronized (this) {
+				while (Objects.isNull(requestObserver)) {
+					try {
+						LOG.debug("wait for observer");
+						wait(300);
+					} catch (InterruptedException e) {
+						LOG.error("Error on waiting for request Observer.", e);
+						Thread.currentThread().interrupt();
+					}
+				}
+			}
+
+		}
+
+		void sendNextChunk() {
+			byte[] contentToSend = streamReader.getNextData();
+
+			if (streamReader.getLastReadSize() > 0) {
+				sendChunk(contentToSend, streamReader.getLastReadSize());
+			} else {
+				endTransfer();
+			}
+		}
+
+		private void endTransfer() {
+			if (completeOnFileSent) {
+				requestObserver.onCompleted();
+			} else {
+				sendEndOfFile();
+				resultFuture.complete(null);
+			}
+			done.set(true);
+			LOG.debug("File Transfer done.");
+			closeStreams();
+
+		}
+
+		private void sendEndOfFile() {
+			sendChunk(new byte[0], streamReader.getLastReadSize());
+		}
+
+		void closeStreams() {
+			LOG.debug("Closing streams");
+			streamReader.close();
+		}
+
+		void sendChunk(byte[] content, int length) {
+			LOG.debug("Sending {} byte Data.", length);
+			var chunk = chunkBuilder.apply(content, length);
+			requestObserver.onNext(chunk);
+		}
+
+		byte[] readFromStream() {
+			try {
+				return inputStream.readNBytes(CHUNK_SIZE);
+			} catch (IOException e) {
+				throw new TechnicalException("Error on sending a single chunk", e);
+			}
+		}
+
+		void sendMetaData() {
+			metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
+		}
+
+		private void doSendMetaData(Q metadata) {
+			LOG.debug("Sending Metadata.");
+			requestObserver.onNext(metadata);
+			metaDataSent.set(true);
+		}
+
+		void checkForEndOfStream(long sentSize) {
+			if (sentSize < CHUNK_SIZE) {
+				LOG.debug("File Transfer done. Closing stream.");
+				IOUtils.closeQuietly(inputStream);
+				requestObserver.onCompleted();
+				done.set(true);
+			} else {
+				LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
+			}
+		}
+
+		@RequiredArgsConstructor
+		private class StreamReader {
+			private final InputStream inStream;
+			private final byte[] buffer = new byte[CHUNK_SIZE];
+			@Getter
+			private int lastReadSize = 0;
+			@Getter
+			private final AtomicBoolean done = new AtomicBoolean(false);
+
+			byte[] getNextData() {
+				readNext();
+				return buffer;
+			}
+
+			void close() {
+				IOUtils.closeQuietly(inStream);
+			}
+
+			void readNext() {
+				try {
+					lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
+				} catch (IOException e) {
+					throw new TechnicalException("Error on reading a single chunk", e);
+				}
+			}
+		}
+	}
+
+}
\ No newline at end of file
-- 
GitLab