diff --git a/vorgang-manager-interface/src/main/protobuf/vorgang.proto b/vorgang-manager-interface/src/main/protobuf/vorgang.proto index fe49a87a5e9bfb7ba504a043a94b871cbae38dd4..327a50e4133fdcf7b6a7c6386693bc6c5798fc06 100644 --- a/vorgang-manager-interface/src/main/protobuf/vorgang.proto +++ b/vorgang-manager-interface/src/main/protobuf/vorgang.proto @@ -48,7 +48,7 @@ service VorgangService { rpc CreateCollaborationVorgang(GrpcCreateCollaborationVorgangRequest) returns (GrpcCreateCollaborationVorgangResponse) { } - rpc FindDeletedVorgang(GrpcFindDeletedVorgangRequest) returns (stream GrpcVorgangHeader) { + rpc FindDeletedVorgang(GrpcFindDeletedVorgangRequest) returns (stream GrpcFindVorgangResponse) { } } diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBatchStreamer.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBatchStreamer.java new file mode 100644 index 0000000000000000000000000000000000000000..5d4e591bf082e99a9c911dd187deb9673e1d6a11 --- /dev/null +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBatchStreamer.java @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.vorgang.common.grpc; + +import java.util.function.Supplier; + +import io.grpc.stub.StreamObserver; +import lombok.Setter; + +public class GrpcResponseBatchStreamer<ElemType, ResType> { + + public static final int DEFAULT_BATCH_SIZE = 100; + + @Setter + private int batchSize = DEFAULT_BATCH_SIZE; + + private final Supplier<GrpcResponseBuilder<ElemType, ResType>> responseBuilderSupplier; + private final StreamObserver<ResType> responseObserver; + private GrpcResponseBuilder<ElemType, ResType> responseBuilder; + + public static <ElemType, ResType> GrpcResponseBatchStreamer<ElemType, ResType> create(Supplier<GrpcResponseBuilder<ElemType, ResType>> responseBuilderSupplier, + StreamObserver<ResType> responseObserver) { + return new GrpcResponseBatchStreamer<>(responseBuilderSupplier, responseObserver); + } + + private GrpcResponseBatchStreamer(Supplier<GrpcResponseBuilder<ElemType, ResType>> responseBuilderSupplier, + StreamObserver<ResType> responseObserver) { + this.responseBuilderSupplier = responseBuilderSupplier; + this.responseObserver = responseObserver; + responseBuilder = responseBuilderSupplier.get(); + } + + public void send(ElemType element) { + responseBuilder.addElement(element); + if (batchIsFull()) { + sendResponse(); + responseBuilder = responseBuilderSupplier.get(); + } + } + + private boolean batchIsFull() { + return responseBuilder.getElementCount() == batchSize; + } + + public void finish() { + if (!batchIsEmpty()) { + sendResponse(); + } + } + + private boolean batchIsEmpty() { + return responseBuilder.getElementCount() == 0; + } + + private void sendResponse() { + responseObserver.onNext(responseBuilder.build()); + } +} diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBuilder.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..0cfe4aeb41286868ab8c896095d167504360bd2c --- /dev/null +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBuilder.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.vorgang.common.grpc; + +public interface GrpcResponseBuilder<ElemType, ResType> { + + int getElementCount(); + + void addElement(ElemType elem); + + ResType build(); +} diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/GrpcFindVorgangResponseBuilder.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/GrpcFindVorgangResponseBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..8ed15a0370366e68db1830ef86f84d59132818f6 --- /dev/null +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/GrpcFindVorgangResponseBuilder.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.vorgang.vorgang; + +import de.ozgcloud.vorgang.common.grpc.GrpcResponseBuilder; + +class GrpcFindVorgangResponseBuilder implements GrpcResponseBuilder<GrpcVorgangHeader, GrpcFindVorgangResponse> { + + private final GrpcFindVorgangResponse.Builder delegate = GrpcFindVorgangResponse.newBuilder(); + + @Override + public int getElementCount() { + return delegate.getVorgangCount(); + } + + @Override + public void addElement(GrpcVorgangHeader elem) { + delegate.addVorgang(elem); + } + + @Override + public GrpcFindVorgangResponse build() { + return delegate.build(); + } +} diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcService.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcService.java index bdb2c373dd0715bbf1f4b38e13749036e4a2b69d..245a1bcd97155f7661d45cc32fe0e5c46783e1a2 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcService.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcService.java @@ -29,6 +29,7 @@ import de.ozgcloud.vorgang.collaboration.CollaborationService; import de.ozgcloud.vorgang.collaboration.CreateCollaborationVorgangBadRequestException; import de.ozgcloud.vorgang.collaboration.CreateCollaborationVorgangRequest; import de.ozgcloud.vorgang.collaboration.CreateCollaborationVorgangRequestMapper; +import de.ozgcloud.vorgang.common.grpc.GrpcResponseBatchStreamer; import io.grpc.stub.StreamObserver; import lombok.RequiredArgsConstructor; import net.devh.boot.grpc.server.service.GrpcService; @@ -141,8 +142,10 @@ class VorgangGrpcService extends VorgangServiceGrpc.VorgangServiceImplBase { } @Override - public void findDeletedVorgang(GrpcFindDeletedVorgangRequest request, StreamObserver<GrpcVorgangHeader> responseObserver) { - vorgangService.findDeleted().map(vorgangStubMapper::toGrpcVorgangHeader).forEach(responseObserver::onNext); + public void findDeletedVorgang(GrpcFindDeletedVorgangRequest request, StreamObserver<GrpcFindVorgangResponse> responseObserver) { + var responseStreamer = GrpcResponseBatchStreamer.create(GrpcFindVorgangResponseBuilder::new, responseObserver); + vorgangService.findDeleted().map(vorgangStubMapper::toGrpcVorgangHeader).forEach(responseStreamer::send); + responseStreamer.finish(); responseObserver.onCompleted(); } } \ No newline at end of file diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBatchStreamerTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBatchStreamerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..44bb51f29a497bd9aa89f77de3237c3edd87cf11 --- /dev/null +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/common/grpc/GrpcResponseBatchStreamerTest.java @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.vorgang.common.grpc; + +import static org.mockito.Mockito.*; + +import java.util.function.Supplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import io.grpc.stub.StreamObserver; + +class GrpcResponseBatchStreamerTest { + + private static final int BATCH_SIZE = 2; + + @Mock + private GrpcResponseBuilder<DummyElement, DummyResponse> responseBuilder; + @Mock + private GrpcResponseBuilder<DummyElement, DummyResponse> responseBuilder2; + @Mock + private Supplier<GrpcResponseBuilder<DummyElement, DummyResponse>> responseBuilderSupplier; + @Mock + private StreamObserver<DummyResponse> responseObserver; + private GrpcResponseBatchStreamer<DummyElement, DummyResponse> batchStreamer; + + @BeforeEach + void init() { + when(responseBuilderSupplier.get()).thenReturn(responseBuilder, responseBuilder2); + batchStreamer = GrpcResponseBatchStreamer.create(responseBuilderSupplier, responseObserver); + batchStreamer.setBatchSize(BATCH_SIZE); + } + + @Test + void shouldGetNewResponseBuilderDuringInitialization() { + verify(responseBuilderSupplier).get(); + } + + @Nested + class TestSend { + + private final DummyElement element = new DummyElement(); + + @Test + void shouldGetElementCount() { + batchStreamer.send(element); + + verify(responseBuilder).getElementCount(); + } + + @Nested + class BatchIsNotFull { + + @BeforeEach + void init() { + when(responseBuilder.getElementCount()).thenReturn(BATCH_SIZE - 1); + } + + @Test + void shouldAddElementToResponse() { + batchStreamer.send(element); + + verify(responseBuilder).addElement(element); + } + + @Test + void shouldNotSendResponse() { + batchStreamer.send(element); + + verifyNoInteractions(responseObserver); + } + } + + @Nested + class BatchIsFull { + + private final DummyResponse response = new DummyResponse(); + + @BeforeEach + void init() { + when(responseBuilder.getElementCount()).thenReturn(BATCH_SIZE); + when(responseBuilder.build()).thenReturn(response); + } + + @Test + void shouldAddElementToResponse() { + batchStreamer.send(element); + + verify(responseBuilder).addElement(element); + } + + @Test + void shouldSendResponse() { + batchStreamer.send(element); + + verify(responseObserver).onNext(response); + } + + @Test + void shouldGetNewResponseBuilder() { + batchStreamer.send(element); + + // first time during initialization, second time in send() + verify(responseBuilderSupplier, times(2)).get(); + } + + @Test + void shouldUseNewResponseBuilderNextTime() { + batchStreamer.send(element); + + batchStreamer.send(element); + + verify(responseBuilder2).addElement(element); + } + } + } + + @Nested + class TestFinish { + + private final DummyResponse response = new DummyResponse(); + + @Test + void shouldGetElementCount() { + batchStreamer.finish(); + + verify(responseBuilder).getElementCount(); + } + + @Test + void shouldNotSendResponseIfBatchIsEmpty() { + when(responseBuilder.getElementCount()).thenReturn(0); + + batchStreamer.finish(); + + verifyNoInteractions(responseObserver); + } + + @Test + void shouldSendResponseIfBatchIsNotEmpty() { + when(responseBuilder.getElementCount()).thenReturn(1); + when(responseBuilder.build()).thenReturn(response); + + batchStreamer.finish(); + + verify(responseObserver).onNext(response); + } + } + + private static class DummyResponse { + } + + private static class DummyElement { + } +} diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceITCase.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceITCase.java index 97e3b6f0a91e1b10f8afe785fc84c5f4ba95ebe9..604d4bf6e60876003f0663870ce8e2332cbfa892 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceITCase.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceITCase.java @@ -23,6 +23,7 @@ */ package de.ozgcloud.vorgang.vorgang; +import static de.ozgcloud.vorgang.common.grpc.GrpcResponseBatchStreamer.*; import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -31,6 +32,8 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -122,7 +125,7 @@ class VorgangGrpcServiceITCase { @Test void shouldKeepFieldsOrderInSubForm() { var grpcFormData = GrpcFormDataTestFactory.createBuilder().addForm( - GrpcSubFormTestFactory.createBuilder().clearField().clearSubForm().setTitle(TITLE_SUBFORM_1).addAllField(formFields)) + GrpcSubFormTestFactory.createBuilder().clearField().clearSubForm().setTitle(TITLE_SUBFORM_1).addAllField(formFields)) .build(); var formData = (Map<String, Object>) startCreation(grpcFormData).get(TITLE_SUBFORM_1); @@ -144,9 +147,9 @@ class VorgangGrpcServiceITCase { @Test void shouldKeepOrderInSubForm() { var grpcFormData = GrpcFormDataTestFactory.createBuilder().addForm( - GrpcSubFormTestFactory.createBuilder().setTitle(TITLE_SUBFORM_1).clearField().clearSubForm() - .addSubForm(GrpcSubFormTestFactory.createBuilder().setTitle(TITLE_SUBFORM_3)) - .addSubForm(GrpcSubFormTestFactory.createBuilder().setTitle(TITLE_SUBFORM_2))) + GrpcSubFormTestFactory.createBuilder().setTitle(TITLE_SUBFORM_1).clearField().clearSubForm() + .addSubForm(GrpcSubFormTestFactory.createBuilder().setTitle(TITLE_SUBFORM_3)) + .addSubForm(GrpcSubFormTestFactory.createBuilder().setTitle(TITLE_SUBFORM_2))) .build(); var formData = (Map<String, Object>) startCreation(grpcFormData).get(TITLE_SUBFORM_1); @@ -235,4 +238,34 @@ class VorgangGrpcServiceITCase { return findVorgangResponseCaptor.getValue().getVorgangWithEingang().getEingang().getFormData(); } } + + @Nested + class TestFindDeleteVorgang { + + private static final int INCOMPLETE_BATCH_SIZE = DEFAULT_BATCH_SIZE - 1; + private static final int DELETED_VORGANG_COUNT = DEFAULT_BATCH_SIZE + INCOMPLETE_BATCH_SIZE; + + @Mock + private StreamObserver<GrpcFindVorgangResponse> streamObserver; + @Captor + private ArgumentCaptor<GrpcFindVorgangResponse> findVorgangResponseCaptor; + + @BeforeEach + void init() { + when(service.findDeleted()).thenReturn(generateVorgangStubs()); + } + + @Test + void shouldSendResponses() { + grpcVorgangService.findDeletedVorgang(GrpcFindDeletedVorgangRequestTestFactory.create(), streamObserver); + + verify(streamObserver, times(2)).onNext(findVorgangResponseCaptor.capture()); + var responses = findVorgangResponseCaptor.getAllValues(); + assertThat(responses).extracting(response -> response.getVorgangList().size()).containsExactly(DEFAULT_BATCH_SIZE, INCOMPLETE_BATCH_SIZE); + } + + private Stream<VorgangStub> generateVorgangStubs() { + return IntStream.range(0, DELETED_VORGANG_COUNT).mapToObj(idx -> VorgangStubTestFactory.create()); + } + } } diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceTest.java index 265b788a76b63d903bd050b4bd9b989a067e0698..d3a0f206e00fd7bb5d14bac9ad508afe4765a0ee 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/VorgangGrpcServiceTest.java @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -40,6 +41,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Spy; import org.springframework.data.domain.Page; @@ -51,6 +53,7 @@ import de.ozgcloud.vorgang.collaboration.CreateCollaborationVorgangRequestMapper import de.ozgcloud.vorgang.collaboration.CreateCollaborationVorgangRequestTestFactory; import de.ozgcloud.vorgang.collaboration.GrpcCreateCollaborationRequestDataTestFactory; import de.ozgcloud.vorgang.collaboration.GrpcCreateCollaborationVorgangResponseTestFactory; +import de.ozgcloud.vorgang.common.grpc.GrpcResponseBatchStreamer; import io.grpc.stub.StreamObserver; class VorgangGrpcServiceTest { @@ -519,18 +522,28 @@ class VorgangGrpcServiceTest { class TestFindDeleteVorgang { private final GrpcFindDeletedVorgangRequest request = GrpcFindDeletedVorgangRequestTestFactory.create(); + private MockedStatic<GrpcResponseBatchStreamer> mockedStatic; @Mock - private StreamObserver<GrpcVorgangHeader> responseObserver; + private GrpcResponseBatchStreamer<GrpcVorgangHeader, GrpcFindVorgangResponse> batchStreamer; + @Mock + private StreamObserver<GrpcFindVorgangResponse> responseObserver; private final VorgangStub vorgangStub = VorgangStubTestFactory.create(); private final GrpcVorgangHeader grpcVorgangHeader = GrpcVorgangHeaderTestFactory.create(); @BeforeEach void init() { + mockedStatic = mockStatic(GrpcResponseBatchStreamer.class); + mockedStatic.when(() -> GrpcResponseBatchStreamer.create(any(), any())).thenReturn(batchStreamer); when(vorgangService.findDeleted()).thenReturn(Stream.of(vorgangStub)); when(vorgangStubMapper.toGrpcVorgangHeader(vorgangStub)).thenReturn(grpcVorgangHeader); } + @AfterEach + void cleanup() { + mockedStatic.close(); + } + @Test void shouldFindDeleted() { findDeletedVorgang(); @@ -546,10 +559,17 @@ class VorgangGrpcServiceTest { } @Test - void shouldCallOnNext() { + void shouldSendToBatchStreamer() { + findDeletedVorgang(); + + verify(batchStreamer).send(grpcVorgangHeader); + } + + @Test + void shouldCallFinish() { findDeletedVorgang(); - verify(responseObserver).onNext(grpcVorgangHeader); + verify(batchStreamer).finish(); } @Test