From 2e2783c67666a12a66640049a4113b7991155650 Mon Sep 17 00:00:00 2001
From: Evgeny Bardin <evgeny.bardin@external.mgm-cp.com>
Date: Thu, 30 Jan 2025 15:10:01 +0100
Subject: [PATCH] OZG-7426 [POC] try parallel upload

---
 .../eingang/router/VorgangRemoteService.java  | 35 +++++++++++++++----
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git a/router/src/main/java/de/ozgcloud/eingang/router/VorgangRemoteService.java b/router/src/main/java/de/ozgcloud/eingang/router/VorgangRemoteService.java
index 260cdc381..f185f2660 100644
--- a/router/src/main/java/de/ozgcloud/eingang/router/VorgangRemoteService.java
+++ b/router/src/main/java/de/ozgcloud/eingang/router/VorgangRemoteService.java
@@ -27,6 +27,9 @@ import java.io.InputStream;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -57,6 +60,7 @@ import io.grpc.stub.StreamObserver;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 
 @Log4j2
@@ -68,6 +72,8 @@ public class VorgangRemoteService {
 
 	private final GrpcEingangMapper grpcEingangMapper;
 
+	private final ExecutorService taskExecutor = Executors.newVirtualThreadPerTaskExecutor();
+
 	public String createVorgang(FormData formData, Optional<String> organisationsEinheitenId) {
 		var vorgangServiceStub = getVorgangServiceStub(organisationsEinheitenId);
 		var binaryFileServiceStub = getBinaryFileServiceStub(organisationsEinheitenId);
@@ -121,11 +127,15 @@ public class VorgangRemoteService {
 		@Getter
 		private List<IncomingFile> uploadedRepresentations;
 
+		@SneakyThrows
 		String create() {
 			vorgangId = startCreation();
 
-			uploadedAttachments = uploadAttachments();
-			uploadedRepresentations = uploadRepresentations();
+			var f1 = taskExecutor.submit(() -> uploadedAttachments = uploadAttachments());
+			var f2 = taskExecutor.submit(() -> uploadedRepresentations = uploadRepresentations());
+			for (var future : List.of(f1, f2)) {
+				future.get(10, TimeUnit.MINUTES);
+			}
 
 			finishCreation();
 			return vorgangId;
@@ -145,17 +155,27 @@ public class VorgangRemoteService {
 		}
 
 		private IncomingFileGroup uploadAttachment(IncomingFileGroup attachment) {
-			var filesWithId = attachment.getFiles().stream().map(this::addIncomingFileId).toList();
+			var filesWithIdFutures = attachment.getFiles().stream().map(this::addIncomingFileId).toList();
+			var fileGroupBuilder = IncomingFileGroup.builder().name(attachment.getName());
+			filesWithIdFutures.stream().map(this::getUploadResult).forEach(fileGroupBuilder::file);
+			return fileGroupBuilder.build();
+		}
 
-			return IncomingFileGroup.builder().name(attachment.getName()).files(filesWithId).build();
+		IncomingFile getUploadResult(Future<IncomingFile> future) {
+			try {
+				return future.get();
+			} catch (InterruptedException | ExecutionException e) {
+				throw new TechnicalException("Error on uploading file.", e);
+			}
 		}
 
 		List<IncomingFile> uploadRepresentations() {
-			return formData.getRepresentations().stream().map(this::addIncomingFileId).toList();
+			var fileWithIdFutures = formData.getRepresentations().stream().map(this::addIncomingFileId).toList();
+			return fileWithIdFutures.stream().map(this::getUploadResult).toList();
 		}
 
-		private IncomingFile addIncomingFileId(IncomingFile file) {
-			return file.toBuilder().id(uploadIncomingFile(file)).build();
+		private Future<IncomingFile> addIncomingFileId(IncomingFile file) {
+			return taskExecutor.submit(() -> file.toBuilder().id(uploadIncomingFile(file)).build());
 		}
 
 		String uploadIncomingFile(IncomingFile incomingFile) {
@@ -199,6 +219,7 @@ public class VorgangRemoteService {
 				fileSender.cancelOnError(e);
 				throw new TechnicalException("Waiting for finishing upload was interrupted.", e);
 			} catch (ExecutionException | TimeoutException e) {
+				LOG.error("Timeout exception '{}'", e.getClass().getSimpleName());
 				fileSender.cancelOnTimeout();
 				throw new TechnicalException("Error / Timeout on uploading data.", e);
 			} finally {
-- 
GitLab