From 0cf5dd0e163633de7196bf368d21d3429089530e Mon Sep 17 00:00:00 2001
From: Felix Reichenbach <felix.reichenbach@mgm-tp.com>
Date: Thu, 20 Mar 2025 10:57:37 +0100
Subject: [PATCH] OZG-7573 add error handler

---
 .../EingangStubReceiverStreamObserver.java    |  5 ++-
 .../forwarder/RouteForwardingGrpcService.java |  5 +++
 ...EingangStubReceiverStreamObserverTest.java | 12 ++++++
 .../RouteForwardingGrpcServiceTest.java       | 41 ++++++++++++++++---
 4 files changed, 56 insertions(+), 7 deletions(-)

diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java
index 1d355f5fd..0d174825d 100644
--- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java
+++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java
@@ -67,17 +67,19 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
 	private final Function<InputStream, CompletableFuture<File>> fileSaver;
 	private final Consumer<FormData> formDataConsumer;
 	private final Consumer<GrpcRouteForwardingResponse> responseConsumer;
+	private final Consumer<Throwable> onErrorHandler;
 
 	@Builder
 	public EingangStubReceiverStreamObserver(RouteForwardingMapper routeForwardingMapper, IncomingFileMapper incomingFileMapper,
 			IncomingFileGroupMapper incomingFileGroupMapper, Function<InputStream, CompletableFuture<File>> fileSaver,
-			Consumer<FormData> formDataConsumer, Consumer<GrpcRouteForwardingResponse> responseConsumer) {
+			Consumer<FormData> formDataConsumer, Consumer<GrpcRouteForwardingResponse> responseConsumer, Consumer<Throwable> onErrorHandler) {
 		this.routeForwardingMapper = routeForwardingMapper;
 		this.incomingFileMapper = incomingFileMapper;
 		this.incomingFileGroupMapper = incomingFileGroupMapper;
 		this.fileSaver = fileSaver;
 		this.formDataConsumer = formDataConsumer;
 		this.responseConsumer = responseConsumer;
+		this.onErrorHandler = onErrorHandler;
 	}
 
 	private FormData formData;
@@ -202,6 +204,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
 		LOG.error("Error happened. Receiving stream closed.", t);
 		closeOutputPipe();
 		closeInputPipe();
+		onErrorHandler.accept(t);
 	}
 
 	void closeOutputPipe() {
diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java
index 322e7361f..d08d98e8e 100644
--- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java
+++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java
@@ -52,6 +52,7 @@ public class RouteForwardingGrpcService extends RouteForwardingServiceGrpc.Route
 				.incomingFileGroupMapper(incomingFileGroupMapper)
 				.formDataConsumer(routeForwardingService::route)
 				.responseConsumer(repsonse -> respondWith(responseObserver, repsonse))
+				.onErrorHandler(error -> handleError(error, responseObserver))
 				.build();
 	}
 
@@ -64,4 +65,8 @@ public class RouteForwardingGrpcService extends RouteForwardingServiceGrpc.Route
 		responseObserver.onCompleted();
 	}
 
+	public void handleError(Throwable error, StreamObserver<GrpcRouteForwardingResponse> responseObserver) {
+		responseObserver.onError(error);
+	}
+
 }
diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java
index d14d99362..fc502fe69 100644
--- a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java
+++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java
@@ -76,6 +76,8 @@ class EingangStubReceiverStreamObserverTest {
 	private Consumer<FormData> formDataConsumer;
 	@Mock
 	private Consumer<GrpcRouteForwardingResponse> responseConsumer;
+	@Mock
+	private Consumer<Throwable> onErrorHandler;
 
 	@BeforeEach
 	void setUp() {
@@ -86,6 +88,7 @@ class EingangStubReceiverStreamObserverTest {
 				.incomingFileGroupMapper(incomingFileGroupMapper)
 				.formDataConsumer(formDataConsumer)
 				.responseConsumer(responseConsumer)
+				.onErrorHandler(onErrorHandler)
 				.build());
 	}
 
@@ -827,6 +830,15 @@ class EingangStubReceiverStreamObserverTest {
 
 			verify(observer).closeInputPipe();
 		}
+
+		@Test
+		void shouldCallOnErrorHandler() {
+			var exception = new Exception();
+
+			observer.onError(exception);
+
+			verify(onErrorHandler).accept(exception);
+		}
 	}
 
 	@Nested
diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java
index 35ac8140c..5c94fb60b 100644
--- a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java
+++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java
@@ -122,6 +122,16 @@ class RouteForwardingGrpcServiceTest {
 			verify(service).respondWith(responseObserver, GrpcRouteForwardingResponse.getDefaultInstance());
 		}
 
+		@Test
+		void shouldSetOnErrorHandler() {
+			var error = new Throwable();
+
+			var observer = service.routeForwarding(responseObserver);
+
+			callOnErrorHandler(observer, error);
+			verify(service).handleError(error, responseObserver);
+		}
+
 		@SuppressWarnings("unchecked")
 		private void callFileSaver(StreamObserver<GrpcRouteForwardingRequest> uploadObserver) {
 			var fileSaver = (Function<InputStream, CompletableFuture<File>>) ReflectionTestUtils.getField(uploadObserver, "fileSaver");
@@ -129,18 +139,15 @@ class RouteForwardingGrpcServiceTest {
 		}
 
 		private RouteForwardingMapper getRouteForwardingMapper(StreamObserver<GrpcRouteForwardingRequest> uploadObserver) {
-			var routeForwardingMapper = (RouteForwardingMapper) ReflectionTestUtils.getField(uploadObserver, "routeForwardingMapper");
-			return routeForwardingMapper;
+			return (RouteForwardingMapper) ReflectionTestUtils.getField(uploadObserver, "routeForwardingMapper");
 		}
 
 		private IncomingFileMapper getIncomingFileMapper(StreamObserver<GrpcRouteForwardingRequest> uploadObserver) {
-			var incomingFileMapper = (IncomingFileMapper) ReflectionTestUtils.getField(uploadObserver, "incomingFileMapper");
-			return incomingFileMapper;
+			return (IncomingFileMapper) ReflectionTestUtils.getField(uploadObserver, "incomingFileMapper");
 		}
 
 		private IncomingFileGroupMapper getIncomingFileGroupMapper(StreamObserver<GrpcRouteForwardingRequest> uploadObserver) {
-			var incomingFileGroupMapper = (IncomingFileGroupMapper) ReflectionTestUtils.getField(uploadObserver, "incomingFileGroupMapper");
-			return incomingFileGroupMapper;
+			return (IncomingFileGroupMapper) ReflectionTestUtils.getField(uploadObserver, "incomingFileGroupMapper");
 		}
 
 		@SuppressWarnings("unchecked")
@@ -154,6 +161,12 @@ class RouteForwardingGrpcServiceTest {
 			var responseConsumer = (Consumer<GrpcRouteForwardingResponse>) ReflectionTestUtils.getField(uploadObserver, "responseConsumer");
 			responseConsumer.accept(response);
 		}
+
+		@SuppressWarnings("unchecked")
+		private void callOnErrorHandler(StreamObserver<GrpcRouteForwardingRequest> uploadObserver, Throwable throwable) {
+			var onErrorHandler = (Consumer<Throwable>) ReflectionTestUtils.getField(uploadObserver, "onErrorHandler");
+			onErrorHandler.accept(throwable);
+		}
 	}
 
 	@Nested
@@ -205,4 +218,20 @@ class RouteForwardingGrpcServiceTest {
 			inOrder.verify(responseObserver).onCompleted();
 		}
 	}
+
+	@Nested
+	class TestHandleError {
+
+		@Mock
+		private StreamObserver<GrpcRouteForwardingResponse> responseObserver;
+
+		@Test
+		void shouldDoNothing() {
+			var error = new Throwable();
+
+			service.handleError(error, responseObserver);
+
+			verify(responseObserver).onError(error);
+		}
+	}
 }
-- 
GitLab