From 5c7df16827d3cc5f809f5945c900e4ae9b5356c8 Mon Sep 17 00:00:00 2001
From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com>
Date: Wed, 23 Apr 2025 19:20:27 +0200
Subject: [PATCH] OZG-7811 OZG-8099 Implement sendAggregationData

---
 .../extern/AggregationDataRemoteService.java  |  67 ++-
 .../AggregationDataRemoteServiceTest.java     | 383 ++++++++++++++++--
 2 files changed, 412 insertions(+), 38 deletions(-)

diff --git a/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java b/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java
index 25d4979..547475d 100644
--- a/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java
+++ b/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java
@@ -24,6 +24,8 @@
 package de.ozgcloud.aggregation.extern;
 
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
 import de.ozgcloud.aggregation.AggregationManagerProperties;
@@ -35,19 +37,28 @@ import de.ozgcloud.aggregation.warehouse.DocumentEntry;
 import io.grpc.stub.ClientCallStreamObserver;
 import io.grpc.stub.ClientResponseObserver;
 import lombok.Builder;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 @RequiredArgsConstructor
 class AggregationDataRemoteService {
 
+	static final int BATCH_SIZE = 100;
+
 	private final AggregationDataServiceGrpc.AggregationDataServiceStub serviceStub;
 	private final AggregationManagerProperties properties;
 	private final GrpcAggregationDataMapper grpcAggregationDataMapper;
 
-	public void sendAggregationData(Aggregation aggregation) {
-		var requestObserver = serviceStub.sendAggregationData(new SendAggregationDataResponseObserver(
-				new RequestData(properties.getMandant(), aggregation.aggregationName,
-						toGrpcAggregationDataStream(aggregation.documentEntries).iterator())));
+	public Future<Void> sendAggregationData(Aggregation aggregation) {
+		var responseObserver = buildSendAggregationDataResponseObserver(aggregation);
+		serviceStub.sendAggregationData(responseObserver);
+		return responseObserver.getResponseFuture();
+	}
+
+	SendAggregationDataResponseObserver buildSendAggregationDataResponseObserver(Aggregation aggregation) {
+		var requestData = new RequestData(properties.getMandant(), aggregation.aggregationName,
+				toGrpcAggregationDataStream(aggregation.documentEntries).iterator());
+		return new SendAggregationDataResponseObserver(requestData);
 	}
 
 	Stream<GrpcAggregationData> toGrpcAggregationDataStream(Stream<DocumentEntry> documentEntries) {
@@ -59,14 +70,12 @@ class AggregationDataRemoteService {
 			implements ClientResponseObserver<GrpcSendAggregationDataRequest, GrpcSendAggregationDataResponse> {
 
 		private final RequestData requestData;
+		@Getter
+		private final CompletableFuture<Void> responseFuture = new CompletableFuture<>();
 
 		@Override
 		public void beforeStart(ClientCallStreamObserver<GrpcSendAggregationDataRequest> requestObserver) {
-			requestObserver.setOnReadyHandler(() -> {
-				while (requestObserver.isReady() && requestData.aggregationDataIterator.hasNext()) {
-					// call onNext()?
-				}
-			});
+			requestObserver.setOnReadyHandler(buildOnReadyHandler(requestObserver));
 		}
 
 		@Override
@@ -76,12 +85,49 @@ class AggregationDataRemoteService {
 
 		@Override
 		public void onError(Throwable t) {
-
+			responseFuture.completeExceptionally(t);
 		}
 
 		@Override
 		public void onCompleted() {
+			responseFuture.complete(null);
+		}
+
+		SendAggregationDataOnReadyHandler buildOnReadyHandler(ClientCallStreamObserver<GrpcSendAggregationDataRequest> requestObserver) {
+			return new SendAggregationDataOnReadyHandler(requestObserver, BATCH_SIZE, requestData);
+		}
+	}
+
+	@RequiredArgsConstructor
+	static class SendAggregationDataOnReadyHandler implements Runnable {
+
+		private final ClientCallStreamObserver<GrpcSendAggregationDataRequest> requestObserver;
+		private final int batchSize;
+		private final RequestData requestData;
 
+		@Override
+		public void run() {
+			while (requestObserver.isReady() && requestData.aggregationDataIterator.hasNext()) {
+				requestObserver.onNext(buildRequest());
+			}
+			if (!requestData.aggregationDataIterator.hasNext()) {
+				requestObserver.onCompleted();
+			}
+		}
+
+		GrpcSendAggregationDataRequest buildRequest() {
+			var builder = GrpcSendAggregationDataRequest.newBuilder()
+					.setName(requestData.aggregationName())
+					.setMandant(requestData.mandant());
+			addAggregationData(builder);
+			return builder.build();
+		}
+
+		private void addAggregationData(GrpcSendAggregationDataRequest.Builder builder) {
+			var elementsAdded = 0;
+			while (requestData.aggregationDataIterator.hasNext() && elementsAdded++ < batchSize) {
+				builder.addAggregationData(requestData.aggregationDataIterator.next());
+			}
 		}
 	}
 
@@ -89,6 +135,7 @@ class AggregationDataRemoteService {
 	public record Aggregation(String aggregationName, Stream<DocumentEntry> documentEntries) {
 	}
 
+	@Builder
 	record RequestData(String mandant, String aggregationName, Iterator<GrpcAggregationData> aggregationDataIterator) {
 	}
 }
diff --git a/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java b/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java
index 46e411f..7836cdf 100644
--- a/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java
+++ b/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java
@@ -26,6 +26,12 @@ package de.ozgcloud.aggregation.extern;
 import static org.assertj.core.api.Assertions.*;
 import static org.mockito.Mockito.*;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
 import org.apache.commons.collections.IteratorUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
@@ -40,8 +46,13 @@ import com.thedeanda.lorem.LoremIpsum;
 
 import de.ozgcloud.aggregation.AggregationManagerProperties;
 import de.ozgcloud.aggregation.data.AggregationDataServiceGrpc;
+import de.ozgcloud.aggregation.data.GrpcAggregationData;
+import de.ozgcloud.aggregation.data.GrpcSendAggregationDataRequest;
 import de.ozgcloud.aggregation.extern.AggregationDataRemoteService.SendAggregationDataResponseObserver;
+import de.ozgcloud.aggregation.warehouse.DocumentEntry;
 import de.ozgcloud.common.test.ReflectionTestUtils;
+import io.grpc.stub.ClientCallStreamObserver;
+import lombok.SneakyThrows;
 
 class AggregationDataRemoteServiceTest {
 
@@ -58,71 +69,387 @@ class AggregationDataRemoteServiceTest {
 	@Nested
 	class TestSendAggregationData {
 
-		private static final String MANDANT = LoremIpsum.getInstance().getWords(1);
+		private final AggregationDataRemoteService.Aggregation aggregation = AggregationTestFactory.create();
 
-		@Captor
-		private ArgumentCaptor<SendAggregationDataResponseObserver> observerCaptor;
+		@Mock
+		private SendAggregationDataResponseObserver responseObserver;
+		@Mock
+		private CompletableFuture<Void> responseFuture;
 
 		@BeforeEach
 		void init() {
-			when(properties.getMandant()).thenReturn(MANDANT);
-			// only called, after stream was consumed (is lazy)
-			lenient().when(grpcAggregationDataMapper.toGrpcAggregationData(AggregationTestFactory.DOCUMENT_ENTRY)).thenReturn(
-					GrpcAggregationDataTestFactory.create());
+			doReturn(responseObserver).when(service).buildSendAggregationDataResponseObserver(any());
+			when(responseObserver.getResponseFuture()).thenReturn(responseFuture);
 		}
 
 		@Test
-		void shouldGetMandant() {
+		void shouldBuildResponseObserver() {
 			sendAggregationData();
 
-			verify(properties).getMandant();
+			verify(service).buildSendAggregationDataResponseObserver(aggregation);
 		}
 
 		@Test
 		void shouldCallServiceStub() {
 			sendAggregationData();
 
-			verify(serviceStub).sendAggregationData(any(SendAggregationDataResponseObserver.class));
+			verify(serviceStub).sendAggregationData(responseObserver);
 		}
 
 		@Test
-		void shouldResponseObserverHaveMandant() {
-			sendAggregationData();
+		void shouldReturnFutureFromResponseObserver() {
+			var future = sendAggregationData();
 
-			assertThat(getRequestDataFromCapturedResponseObserver().mandant()).isEqualTo(MANDANT);
+			assertThat(future).isSameAs(responseFuture);
+		}
+
+		private Future<Void> sendAggregationData() {
+			return service.sendAggregationData(aggregation);
+		}
+	}
+
+	@Nested
+	class TestBuildSendAggregationDataResponseObserver {
+
+		@Captor
+		private ArgumentCaptor<Stream<DocumentEntry>> documentEntriesCaptor;
+
+		@BeforeEach
+		void init() {
+			doReturn(Stream.of(GrpcAggregationDataTestFactory.create())).when(service).toGrpcAggregationDataStream(any());
 		}
 
 		@Test
-		void shouldResponseObserverHaveAggregationName() {
-			sendAggregationData();
+		void shouldGetMandant() {
+			service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create());
+
+			verify(properties).getMandant();
+		}
+
+		@Test
+		void shouldCreateGrpcAggregationDataStream() {
+			service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create());
 
-			assertThat(getRequestDataFromCapturedResponseObserver().aggregationName()).isEqualTo(AggregationTestFactory.AGGREGATION_NAME);
+			verify(service).toGrpcAggregationDataStream(documentEntriesCaptor.capture());
+			assertThat(documentEntriesCaptor.getValue()).containsExactly(AggregationTestFactory.DOCUMENT_ENTRY);
+		}
+
+		@Test
+		void shouldReturnResponseObserver() {
+			var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create());
+
+			assertThat(builtResponseObserver).isNotNull();
+		}
+
+		@Nested
+		class TestBuiltResponseObserver {
+
+			private static final String MANDANT = LoremIpsum.getInstance().getWords(1);
+
+			@BeforeEach
+			void init() {
+				when(properties.getMandant()).thenReturn(MANDANT);
+			}
+
+			@Test
+			void shouldHaveMandant() {
+				var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create());
+
+				assertThat(getRequestData(builtResponseObserver).mandant()).isEqualTo(MANDANT);
+			}
+
+			@Test
+			void shouldHaveAggregationName() {
+				var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create());
+
+				assertThat(getRequestData(builtResponseObserver).aggregationName()).isEqualTo(AggregationTestFactory.AGGREGATION_NAME);
+			}
+
+			@Test
+			@SuppressWarnings("unchecked")
+			void shouldHaveGrpcAggregationData() {
+				var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create());
+
+				var aggregationData = IteratorUtils.toList(getRequestData(builtResponseObserver).aggregationDataIterator());
+				assertThat(aggregationData).containsExactly(GrpcAggregationDataTestFactory.create());
+			}
+
+			private AggregationDataRemoteService.RequestData getRequestData(SendAggregationDataResponseObserver responseObserver) {
+				return ReflectionTestUtils.getField(responseObserver, "requestData", AggregationDataRemoteService.RequestData.class);
+			}
+		}
+	}
+
+	@Nested
+	class TestToGrpcAggregationDataStream {
+
+		@BeforeEach
+		void init() {
+			when(grpcAggregationDataMapper.toGrpcAggregationData(AggregationTestFactory.DOCUMENT_ENTRY)).thenReturn(
+					GrpcAggregationDataTestFactory.create());
 		}
 
 		@Test
 		void shouldMapDocumentEntry() {
-			sendAggregationData();
+			service.toGrpcAggregationDataStream(Stream.of(AggregationTestFactory.DOCUMENT_ENTRY)).toList();
 
-			IteratorUtils.toList(getRequestDataFromCapturedResponseObserver().aggregationDataIterator());
 			verify(grpcAggregationDataMapper).toGrpcAggregationData(AggregationTestFactory.DOCUMENT_ENTRY);
 		}
 
 		@Test
-		@SuppressWarnings("unchecked")
-		void shouldHaveGrpcAggregationData() {
-			sendAggregationData();
+		void shouldReturnMappedDocumentEntries() {
+			var mappedDocumentEntries = service.toGrpcAggregationDataStream(Stream.of(AggregationTestFactory.DOCUMENT_ENTRY));
+
+			assertThat(mappedDocumentEntries.toList()).containsExactly(GrpcAggregationDataTestFactory.create());
+		}
+	}
+
+	@Nested
+	class SendAggregationDataResponseObserverTest {
+
+		@Mock
+		private AggregationDataRemoteService.RequestData requestData;
+		@Spy
+		@InjectMocks
+		private SendAggregationDataResponseObserver responseObserver;
+
+		@Nested
+		class TestBeforeStart {
+
+			@Mock
+			private ClientCallStreamObserver<GrpcSendAggregationDataRequest> requestObserver;
+			@Mock
+			private AggregationDataRemoteService.SendAggregationDataOnReadyHandler onReadyHandler;
+
+			@BeforeEach
+			void init() {
+				doReturn(onReadyHandler).when(responseObserver).buildOnReadyHandler(any());
+			}
+
+			@Test
+			void shouldBuildOnReadyHandler() {
+				responseObserver.beforeStart(requestObserver);
+
+				verify(responseObserver).buildOnReadyHandler(requestObserver);
+			}
+
+			@Test
+			void shouldSetOnReadyHandler() {
+				responseObserver.beforeStart(requestObserver);
+
+				verify(requestObserver).setOnReadyHandler(onReadyHandler);
+			}
+		}
+
+		@Nested
+		class TestOnError {
+
+			@Test
+			@SneakyThrows
+			void shouldCompleteExceptionally() {
+				var error = new Throwable();
+
+				responseObserver.onError(error);
+
+				assertThatException().isThrownBy(() -> responseObserver.getResponseFuture().get()).withCause(error);
+			}
+		}
+
+		@Nested
+		class TestOnCompleted {
+
+			@Test
+			void shouldCompleteFuture() {
+				responseObserver.onCompleted();
+
+				assertThat(responseObserver.getResponseFuture().isDone()).isTrue();
+			}
+		}
+	}
+
+	@Nested
+	class SendAggregationDataOnReadyHandlerTest {
+
+		private static final int BATCH_SIZE = 2;
+		private static final String MANDANT = LoremIpsum.getInstance().getWords(1);
+
+		@Mock
+		private ClientCallStreamObserver<GrpcSendAggregationDataRequest> requestObserver;
+
+		@Nested
+		class TestRun {
+
+			@Mock
+			private GrpcSendAggregationDataRequest request;
+
+			@Test
+			void shouldCheckIfStreamIsReady() {
+				createOnReadyHandler(createAggregationData(1)).run();
+
+				verify(requestObserver).isReady();
+			}
+
+			@Test
+			void shouldNotCallOnNextWhenStreamIsNotReady() {
+				when(requestObserver.isReady()).thenReturn(false);
+
+				createOnReadyHandler(createAggregationData(1)).run();
+
+				verify(requestObserver, never()).onNext(any());
+			}
+
+			@Test
+			void shouldNotCallOnNextWhenHasNoMoreData() {
+				when(requestObserver.isReady()).thenReturn(true);
+
+				createOnReadyHandler(createAggregationData(0)).run();
+
+				verify(requestObserver, never()).onNext(any());
+			}
+
+			@Test
+			void shouldBuildRequest() {
+				when(requestObserver.isReady()).thenReturn(true);
+				var onReadyHandler = spy(createOnReadyHandler(createAggregationData(1)));
+
+				onReadyHandler.run();
+
+				verify(onReadyHandler).buildRequest();
+			}
+
+			@Test
+			void shouldCallOnNextWithBuiltRequest() {
+				when(requestObserver.isReady()).thenReturn(true).thenReturn(false);
+				var onReadyHandler = spy(createOnReadyHandler(createAggregationData(1)));
+				doReturn(request).when(onReadyHandler).buildRequest();
+
+				onReadyHandler.run();
+
+				verify(requestObserver).onNext(request);
+			}
+
+			@Test
+			void shouldCallOnNextUntilAllDataWasSent() {
+				when(requestObserver.isReady()).thenReturn(true);
+
+				createOnReadyHandler(createAggregationData(BATCH_SIZE + 1)).run();
+
+				verify(requestObserver, times(2)).onNext(any());
+			}
+
+			@Test
+			void shouldCompleteRequest() {
+				when(requestObserver.isReady()).thenReturn(true);
+
+				createOnReadyHandler(createAggregationData(BATCH_SIZE + 1)).run();
+
+				verify(requestObserver).onCompleted();
+			}
+
+			@Test
+			void shouldNotCompleteRequestIfNotAllDataWasSent() {
+				when(requestObserver.isReady()).thenReturn(true).thenReturn(false);
+
+				createOnReadyHandler(createAggregationData(BATCH_SIZE + 1)).run();
+
+				verify(requestObserver, never()).onCompleted();
+			}
+		}
+
+		@Nested
+		class TestBuildRequest {
+
+			@Test
+			void shouldSetName() {
+				var request = createOnReadyHandler().buildRequest();
+
+				assertThat(request.getName()).isEqualTo(AggregationTestFactory.AGGREGATION_NAME);
+			}
+
+			@Test
+			void shouldSetMandant() {
+				var request = createOnReadyHandler().buildRequest();
+
+				assertThat(request.getMandant()).isEqualTo(MANDANT);
+			}
+
+			@Nested
+			class OnHasNoAggregationData {
+
+				@Test
+				void shouldAggregationDataBeEmpty() {
+					var request = createOnReadyHandler(List.of()).buildRequest();
+
+					assertThat(request.getAggregationDataCount()).isEqualTo(0);
+				}
+			}
+
+			@Nested
+			class OnHasLessAggregationDataThenBatchSize {
+
+				@Test
+				void shouldAddAllAggregationData() {
+					var aggregationData = createAggregationData(BATCH_SIZE - 1);
+
+					var request = createOnReadyHandler(aggregationData).buildRequest();
+
+					assertThat(request.getAggregationDataList()).hasSameElementsAs(aggregationData);
+				}
+			}
+
+			@Nested
+			class OnHasBatchSizeOfAggregationData {
+
+				@Test
+				void shouldAddAllAggregationData() {
+					var aggregationData = createAggregationData(BATCH_SIZE);
+
+					var request = createOnReadyHandler(aggregationData).buildRequest();
+
+					assertThat(request.getAggregationDataList()).hasSameElementsAs(aggregationData);
+				}
+			}
+
+			@Nested
+			class OnHasMoreAggregationDataThenBatchSize {
+
+				@Test
+				void shouldHaveExactlyBatchSizeOfAggregationData() {
+					var aggregationData = createAggregationData(BATCH_SIZE + 1);
+
+					var request = createOnReadyHandler(aggregationData).buildRequest();
+
+					assertThat(request.getAggregationDataCount()).isEqualTo(BATCH_SIZE);
+				}
+
+				@Test
+				void shouldContainOnlyFirstBatchSizeOfElements() {
+					var aggregationData = createAggregationData(BATCH_SIZE + 1);
+
+					var request = createOnReadyHandler(aggregationData).buildRequest();
+
+					assertThat(request.getAggregationDataList()).containsExactlyElementsOf(aggregationData.subList(0, BATCH_SIZE));
+				}
+			}
+		}
 
-			var grpcAggregationDataList = IteratorUtils.toList(getRequestDataFromCapturedResponseObserver().aggregationDataIterator());
-			assertThat(grpcAggregationDataList).containsExactly(GrpcAggregationDataTestFactory.create());
+		private List<GrpcAggregationData> createAggregationData(int count) {
+			return IntStream.range(1, count + 1)
+					.mapToObj(idx -> GrpcAggregationDataTestFactory.createBuilder().setVorgangName("vorgang " + idx).build())
+					.toList();
 		}
 
-		private void sendAggregationData() {
-			service.sendAggregationData(AggregationTestFactory.create());
+		private AggregationDataRemoteService.SendAggregationDataOnReadyHandler createOnReadyHandler() {
+			return createOnReadyHandler(List.of());
 		}
 
-		private AggregationDataRemoteService.RequestData getRequestDataFromCapturedResponseObserver() {
-			verify(serviceStub).sendAggregationData(observerCaptor.capture());
-			return ReflectionTestUtils.getField(observerCaptor.getValue(), "requestData", AggregationDataRemoteService.RequestData.class);
+		private AggregationDataRemoteService.SendAggregationDataOnReadyHandler createOnReadyHandler(List<GrpcAggregationData> aggregationData) {
+			return new AggregationDataRemoteService.SendAggregationDataOnReadyHandler(requestObserver, BATCH_SIZE,
+					AggregationDataRemoteService.RequestData.builder()
+							.mandant(MANDANT)
+							.aggregationName(AggregationTestFactory.AGGREGATION_NAME)
+							.aggregationDataIterator(aggregationData.iterator())
+							.build());
 		}
 	}
 }
-- 
GitLab