diff --git a/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java b/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java index c10dbbbcfe39cdbc86257f44ca6f146f4287cd03..643bbeb734187f5e3f23d8b43f47d7548192efba 100644 --- a/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java +++ b/aggregation-manager-job/src/main/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteService.java @@ -31,6 +31,7 @@ import java.util.stream.Stream; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import de.ozgcloud.aggregation.AggregationManagerConfiguration; import de.ozgcloud.aggregation.AggregationManagerProperties; import de.ozgcloud.aggregation.data.AggregationDataServiceGrpc; import de.ozgcloud.aggregation.data.GrpcAggregationData; @@ -49,11 +50,10 @@ import net.devh.boot.grpc.client.inject.GrpcClient; @RequiredArgsConstructor class AggregationDataRemoteService { - static final int BATCH_SIZE = 100; - @GrpcClient("aggregation-manager") private final AggregationDataServiceGrpc.AggregationDataServiceStub serviceStub; private final AggregationManagerProperties properties; + private final AggregationManagerConfiguration configuration; private final GrpcAggregationDataMapper grpcAggregationDataMapper; public Future<Void> sendAggregationData(Aggregation aggregation) { @@ -65,7 +65,7 @@ class AggregationDataRemoteService { SendAggregationDataResponseObserver buildSendAggregationDataResponseObserver(Aggregation aggregation) { var requestData = new RequestData(properties.getMandant(), aggregation.aggregationName, toGrpcAggregationDataStream(aggregation.documentEntries).iterator()); - return new SendAggregationDataResponseObserver(requestData); + return new SendAggregationDataResponseObserver(configuration.getFetchingBatchSize(), requestData); } Stream<GrpcAggregationData> toGrpcAggregationDataStream(Stream<DocumentEntry> documentEntries) { @@ -76,6 +76,7 @@ class AggregationDataRemoteService { static class SendAggregationDataResponseObserver implements ClientResponseObserver<GrpcSendAggregationDataRequest, GrpcSendAggregationDataResponse> { + private final int batchSize; private final RequestData requestData; @Getter private final CompletableFuture<Void> responseFuture = new CompletableFuture<>(); @@ -101,7 +102,7 @@ class AggregationDataRemoteService { } SendAggregationDataOnReadyHandler buildOnReadyHandler(ClientCallStreamObserver<GrpcSendAggregationDataRequest> requestObserver) { - return new SendAggregationDataOnReadyHandler(requestObserver, BATCH_SIZE, requestData); + return new SendAggregationDataOnReadyHandler(requestObserver, batchSize, requestData); } } diff --git a/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java b/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java index 7836cdff057dc530e788c0e46582fd4b39f8385a..2023e0c4107b471a74de899a3c5ea7a60662e540 100644 --- a/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java +++ b/aggregation-manager-job/src/test/java/de/ozgcloud/aggregation/extern/AggregationDataRemoteServiceTest.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import java.util.List; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.stream.IntStream; @@ -44,6 +45,7 @@ import org.mockito.Spy; import com.thedeanda.lorem.LoremIpsum; +import de.ozgcloud.aggregation.AggregationManagerConfiguration; import de.ozgcloud.aggregation.AggregationManagerProperties; import de.ozgcloud.aggregation.data.AggregationDataServiceGrpc; import de.ozgcloud.aggregation.data.GrpcAggregationData; @@ -61,6 +63,8 @@ class AggregationDataRemoteServiceTest { @Mock private AggregationManagerProperties properties; @Mock + private AggregationManagerConfiguration configuration; + @Mock private GrpcAggregationDataMapper grpcAggregationDataMapper; @InjectMocks @Spy @@ -111,12 +115,14 @@ class AggregationDataRemoteServiceTest { @Nested class TestBuildSendAggregationDataResponseObserver { + private static final int BATCH_SIZE = new Random().nextInt(100); @Captor private ArgumentCaptor<Stream<DocumentEntry>> documentEntriesCaptor; @BeforeEach void init() { doReturn(Stream.of(GrpcAggregationDataTestFactory.create())).when(service).toGrpcAggregationDataStream(any()); + when(configuration.getFetchingBatchSize()).thenReturn(BATCH_SIZE); } @Test @@ -134,6 +140,13 @@ class AggregationDataRemoteServiceTest { assertThat(documentEntriesCaptor.getValue()).containsExactly(AggregationTestFactory.DOCUMENT_ENTRY); } + @Test + void shouldGetBatchSize() { + service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create()); + + verify(configuration).getFetchingBatchSize(); + } + @Test void shouldReturnResponseObserver() { var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create()); @@ -151,6 +164,13 @@ class AggregationDataRemoteServiceTest { when(properties.getMandant()).thenReturn(MANDANT); } + @Test + void shouldHaveBatchSize() { + var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create()); + + assertThat(getBatchSize(builtResponseObserver)).isEqualTo(BATCH_SIZE); + } + @Test void shouldHaveMandant() { var builtResponseObserver = service.buildSendAggregationDataResponseObserver(AggregationTestFactory.create()); @@ -174,6 +194,10 @@ class AggregationDataRemoteServiceTest { assertThat(aggregationData).containsExactly(GrpcAggregationDataTestFactory.create()); } + private int getBatchSize(SendAggregationDataResponseObserver responseObserver) { + return ReflectionTestUtils.getField(responseObserver, "batchSize", Integer.class); + } + private AggregationDataRemoteService.RequestData getRequestData(SendAggregationDataResponseObserver responseObserver) { return ReflectionTestUtils.getField(responseObserver, "requestData", AggregationDataRemoteService.RequestData.class); } @@ -207,11 +231,12 @@ class AggregationDataRemoteServiceTest { @Nested class SendAggregationDataResponseObserverTest { + private static final int BATCH_SIZE = 2; + @Mock private AggregationDataRemoteService.RequestData requestData; @Spy - @InjectMocks - private SendAggregationDataResponseObserver responseObserver; + private SendAggregationDataResponseObserver responseObserver = new SendAggregationDataResponseObserver(BATCH_SIZE, requestData); @Nested class TestBeforeStart {