From aee9317056988bf957670e61ac25ab7226166efe Mon Sep 17 00:00:00 2001 From: Felix Reichenbach <felix.reichenbach@mgm-tp.com> Date: Fri, 14 Mar 2025 16:13:31 +0100 Subject: [PATCH] OZG-7573 add poc version for EingangStubReceiverStreamObserver --- forwarder/pom.xml | 4 + .../EingangStubReceiverStreamObserver.java | 200 ++++++++++++++++++ .../forwarder/IncomingFileGroupMapper.java | 15 ++ .../eingang/forwarder/IncomingFileMapper.java | 22 ++ .../eingang/forwarder/RouteCriteria.java | 2 - .../forwarder/RouteForwardingGrpcService.java | 8 +- .../forwarder/RouteForwardingMapper.java | 54 +++++ ...GrpcRouteForwardingRequestTestFactory.java | 4 +- .../forwarder/RouteCriteriaTestFactory.java | 9 +- .../RouteForwardingGrpcServiceTest.java | 34 +-- pom.xml | 10 +- router/pom.xml | 5 + 12 files changed, 335 insertions(+), 32 deletions(-) create mode 100644 forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java create mode 100644 forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileGroupMapper.java create mode 100644 forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileMapper.java create mode 100644 forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingMapper.java diff --git a/forwarder/pom.xml b/forwarder/pom.xml index d79e4343e..4fabecd6a 100644 --- a/forwarder/pom.xml +++ b/forwarder/pom.xml @@ -49,6 +49,10 @@ <groupId>de.ozgcloud.eingang</groupId> <artifactId>common</artifactId> </dependency> + <dependency> + <groupId>de.ozgcloud.eingang</groupId> + <artifactId>eingang-manager-interface</artifactId> + </dependency> <dependency> <groupId>de.ozgcloud.eingang</groupId> <artifactId>router</artifactId> diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java new file mode 100644 index 000000000..13044c3f5 --- /dev/null +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java @@ -0,0 +1,200 @@ +package de.ozgcloud.eingang.forwarder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.commons.io.IOUtils; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import de.ozgcloud.eingang.common.formdata.FormData; +import de.ozgcloud.eingang.common.formdata.IncomingFile; +import de.ozgcloud.eingang.forwarding.GrpcAttachment; +import de.ozgcloud.eingang.forwarding.GrpcFileContent; +import de.ozgcloud.eingang.forwarding.GrpcRepresentation; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; +import io.grpc.stub.StreamObserver; +import lombok.Builder; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwardingRequest> { + + private static final int CHUNK_SIZE = 1024 * 64; + private static final long TIMEOUT_MINUTES = 10; + private final RouteForwardingMapper routeForwardingMapper; + private final IncomingFileMapper incomingFileMapper; + private final IncomingFileGroupMapper incomingFileGroupMapper; + private final Function<InputStream, CompletableFuture<File>> fileSaver; + private final Consumer<FormData> formDataConsumer; + + @Builder + public EingangStubReceiverStreamObserver(RouteForwardingMapper routeForwardingMapper, IncomingFileMapper incomingFileMapper, + IncomingFileGroupMapper incomingFileGroupMapper, Function<InputStream, CompletableFuture<File>> fileSaver, + Consumer<FormData> formDataConsumer) { + this.routeForwardingMapper = routeForwardingMapper; + this.incomingFileMapper = incomingFileMapper; + this.incomingFileGroupMapper = incomingFileGroupMapper; + this.fileSaver = fileSaver; + this.formDataConsumer = formDataConsumer; + } + + private FormData formData; + private List<IncomingFile> representations = new ArrayList<>(); + private Map<String, List<IncomingFile>> attachments = new HashMap<>(); + + private IncomingFile currentFile; + private Optional<String> groupName = Optional.empty(); + private PipedOutputStream pipedOutput; + private PipedInputStream pipedInput; + private CompletableFuture<File> receivingFileContent; + + @Override + public synchronized void onNext(GrpcRouteForwardingRequest request) { + if (request.hasRouteForwarding()) { + handleRouteForwarding(request.getRouteForwarding()); + } + if (request.hasAttachment()) { + handleAttachment(request.getAttachment()); + } + if (request.hasRepresentation()) { + handleRepresentation(request.getRepresentation()); + } + } + + private void handleRouteForwarding(GrpcRouteForwarding routeForwarding) { + if (Objects.nonNull(formData)) { + throw new IllegalStateException("Received second RouteForwarding. Send only one per request."); + } + formData = routeForwardingMapper.toFormData(routeForwarding.getEingangStub(), routeForwarding.getRouteCriteria()); + } + + private void handleAttachment(GrpcAttachment attachment) { + if (attachment.hasFile()) { + setCurrentMetadata(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile())); + groupName = Optional.of(attachment.getFile().getGroupName()); + } else { + if (Objects.isNull(receivingFileContent)) { + initContentReceiving(); + } + storeFileContent(attachment.getContent()); + } + } + + private void handleRepresentation(GrpcRepresentation representation) { + if (representation.hasFile()) { + setCurrentMetadata(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile())); + } else { + if (Objects.isNull(receivingFileContent)) { + initContentReceiving(); + } + storeFileContent(representation.getContent()); + } + + } + + private void setCurrentMetadata(IncomingFile metaData) { + if (Objects.nonNull(currentFile)) { + throw new TechnicalException("Received additional file before previos file reached the end."); + } + currentFile = metaData; + } + + private void initContentReceiving() { + try { + pipedInput = new PipedInputStream(CHUNK_SIZE); + pipedOutput = new PipedOutputStream(pipedInput); + receivingFileContent = fileSaver.apply(pipedInput); + } catch (IOException e) { + throw new TechnicalException("Upload initialization failed", e); + } + } + + private void storeFileContent(GrpcFileContent content) { + if (Objects.isNull(currentFile)) { + throw new TechnicalException("File content received before metadata."); + } + try { + pipedOutput.write(content.getContent().toByteArray()); + if (content.getIsEndOfFile()) { + handleEndOfFile(); + } + } catch (IOException e) { + throw new TechnicalException("Error when writing file content.", e); + } + } + + private void handleEndOfFile() { + closeOutputPipe(); + var completedIncomingFile = currentFile.toBuilder().file(getSavedFileContent()).build(); + groupName.map(group -> attachments.get(group)).orElse(representations).add(completedIncomingFile); + resetFileReceiving(); + } + + private File getSavedFileContent() { + try { + return receivingFileContent.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); + } catch (ExecutionException | TimeoutException e) { + throw new TechnicalException("Receiving file failed.", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TechnicalException("Upload was interrupted.", e); + } finally { + closeInputPipe(); + } + } + + private void resetFileReceiving() { + currentFile = null; + groupName = Optional.empty(); + pipedOutput = null; + pipedInput = null; + receivingFileContent = null; + } + + @Override + public void onError(Throwable t) { + LOG.error("Error happened. Receiving stream closed.", t); + closeOutputPipe(); + closeInputPipe(); + } + + private void closeOutputPipe() { + IOUtils.closeQuietly(pipedOutput, e -> LOG.error("Cannot close output stream.", e)); + } + + private void closeInputPipe() { + IOUtils.closeQuietly(pipedInput, e -> LOG.error("Cannot close input stream.", e)); + } + + @Override + public void onCompleted() { + formDataConsumer.accept(assembleFormData()); + } + + private FormData assembleFormData() { + if (Objects.isNull(formData)) { + throw new IllegalStateException("Never received RouteForwarding containing EingangStub and RouteCriteria."); + } + return formData.toBuilder() + .representations(representations) + .attachments(attachments.entrySet().stream().map(incomingFileGroupMapper::fromMapEntry).toList()) + .build(); + } + +} diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileGroupMapper.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileGroupMapper.java new file mode 100644 index 000000000..7f35ef41d --- /dev/null +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileGroupMapper.java @@ -0,0 +1,15 @@ +package de.ozgcloud.eingang.forwarder; + +import java.util.List; +import java.util.Map; + +import org.mapstruct.Mapper; + +import de.ozgcloud.eingang.common.formdata.IncomingFile; +import de.ozgcloud.eingang.common.formdata.IncomingFileGroup; + +@Mapper +public interface IncomingFileGroupMapper { + + IncomingFileGroup fromMapEntry(Map.Entry<String, List<IncomingFile>> entry); // TODO: Implement this method +} diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileMapper.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileMapper.java new file mode 100644 index 000000000..26e3ceddd --- /dev/null +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/IncomingFileMapper.java @@ -0,0 +1,22 @@ +package de.ozgcloud.eingang.forwarder; + +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; + +import de.ozgcloud.eingang.common.formdata.IncomingFile; +import de.ozgcloud.eingang.forwarding.GrpcAttachmentFile; +import de.ozgcloud.eingang.forwarding.GrpcRepresentationFile; + +@Mapper +interface IncomingFileMapper { + + @Mapping(target = "id", ignore = true) + @Mapping(target = "file", ignore = true) + @Mapping(target = "name", source = "fileName") + IncomingFile fromGrpcRepresentationFile(GrpcRepresentationFile representationFile); + + @Mapping(target = "id", ignore = true) + @Mapping(target = "file", ignore = true) + @Mapping(target = "name", source = "fileName") + IncomingFile fromGrpcAttachmentFile(GrpcAttachmentFile representationFile); +} diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteCriteria.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteCriteria.java index 3a36486c9..65583d7cf 100644 --- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteCriteria.java +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteCriteria.java @@ -32,7 +32,5 @@ import lombok.Getter; @Builder public class RouteCriteria { - private Optional<String> gemeindeSchluessel; - private Optional<String> webserviceUrl; private Optional<String> organisationEinheitId; } diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java index 6827f8839..82f8ae332 100644 --- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcService.java @@ -40,7 +40,11 @@ public class RouteForwardingGrpcService extends RouteForwardingServiceGrpc.Route private final GrpcEingangMapper eingangMapper; @Override - public void routeForwarding(GrpcRouteForwardingRequest request, StreamObserver<GrpcRouteForwardingResponse> responseObserver) { - service.route(criteriaMapper.fromGrpc(request.getRouteCriteria()), eingangMapper.toFormData(request.getEingang())); + public StreamObserver<GrpcRouteForwardingRequest> routeForwarding(StreamObserver<GrpcRouteForwardingResponse> responseObserver) { + // public void routeForwarding(GrpcRouteForwardingRequest request, + // StreamObserver<GrpcRouteForwardingResponse> responseObserver) { + // service.route(criteriaMapper.fromGrpc(request.getRouteCriteria()), + // eingangMapper.toFormData(request.getEingang())); + return null; } } diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingMapper.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingMapper.java new file mode 100644 index 000000000..315ba8131 --- /dev/null +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/RouteForwardingMapper.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.eingang.forwarder; + +import java.util.Map; + +import org.mapstruct.CollectionMappingStrategy; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.NullValueCheckStrategy; +import org.mapstruct.NullValuePropertyMappingStrategy; +import org.mapstruct.ReportingPolicy; + +import de.ozgcloud.eingang.common.formdata.FormData; +import de.ozgcloud.eingang.forwarding.GrpcEingangStub; +import de.ozgcloud.eingang.forwarding.GrpcFormData; +import de.ozgcloud.eingang.forwarding.GrpcRouteCriteria; + +@Mapper(unmappedTargetPolicy = ReportingPolicy.WARN, // + nullValuePropertyMappingStrategy = NullValuePropertyMappingStrategy.IGNORE, // + nullValueCheckStrategy = NullValueCheckStrategy.ALWAYS, // + collectionMappingStrategy = CollectionMappingStrategy.ADDER_PREFERRED) +public interface RouteForwardingMapper { + + @Mapping(target = "attachments", ignore = true) + @Mapping(target = "representations", ignore = true) + @Mapping(target = "control", ignore = true) + FormData toFormData(GrpcEingangStub eingangStub, GrpcRouteCriteria routeCriteria); // TODO: Test + + default Map<String, Object> map(GrpcFormData value) { + return null; // TODO: Implement + }; +} diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/GrpcRouteForwardingRequestTestFactory.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/GrpcRouteForwardingRequestTestFactory.java index 37f758e1d..ddd6a44d2 100644 --- a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/GrpcRouteForwardingRequestTestFactory.java +++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/GrpcRouteForwardingRequestTestFactory.java @@ -38,8 +38,6 @@ public class GrpcRouteForwardingRequestTestFactory { } public static GrpcRouteForwardingRequest.Builder createBuilder() { - return GrpcRouteForwardingRequest.newBuilder() - .setEingang(EINGANG) - .setRouteCriteria(CRITERIA); + return GrpcRouteForwardingRequest.newBuilder(); } } diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteCriteriaTestFactory.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteCriteriaTestFactory.java index 82a9dc490..49c265a75 100644 --- a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteCriteriaTestFactory.java +++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteCriteriaTestFactory.java @@ -24,14 +24,13 @@ package de.ozgcloud.eingang.forwarder; import java.util.Optional; +import java.util.UUID; import de.ozgcloud.eingang.forwarding.GrpcRouteCriteria; public class RouteCriteriaTestFactory { - public static final String GEMEINDE_SCHLUSSEL = "0815"; - public static final String WEBSERVICE_URL = "http://nimmerland.by.kop-cloud.de/ws"; - public static final String ORGANISATION_EINHEIT_ID = "4711"; + public static final String ORGANISATION_EINHEIT_ID = UUID.randomUUID().toString(); public static RouteCriteria create() { return createBuilder().build(); @@ -39,8 +38,6 @@ public class RouteCriteriaTestFactory { public static RouteCriteria.RouteCriteriaBuilder createBuilder() { return RouteCriteria.builder() - .gemeindeSchluessel(Optional.of(GEMEINDE_SCHLUSSEL)) - .webserviceUrl(Optional.of(WEBSERVICE_URL)) .organisationEinheitId(Optional.of(ORGANISATION_EINHEIT_ID)); } @@ -50,8 +47,6 @@ public class RouteCriteriaTestFactory { public static GrpcRouteCriteria.Builder createGrpcBuilder() { return GrpcRouteCriteria.newBuilder() - .setGemeindeSchluessel(GEMEINDE_SCHLUSSEL) - .setWebserviceUrl(WEBSERVICE_URL) .setOrganisationseinheitenId(ORGANISATION_EINHEIT_ID); } } diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java index e165091ee..f4f0b4f64 100644 --- a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java +++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/RouteForwardingGrpcServiceTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.*; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; import org.mockito.Mock; @@ -65,26 +64,29 @@ class RouteForwardingGrpcServiceTest { when(eingangMapper.toFormData(any())).thenReturn(formData); } - @Test - void shouldMapCriteria() { - service.routeForwarding(GrpcRouteForwardingRequestTestFactory.create(), responseObserver); + // @Test + // void shouldMapCriteria() { + // service.routeForwarding(GrpcRouteForwardingRequestTestFactory.create(), + // responseObserver); - verify(criteriaMapper).fromGrpc(GrpcRouteForwardingRequestTestFactory.CRITERIA); - } + // verify(criteriaMapper).fromGrpc(GrpcRouteForwardingRequestTestFactory.CRITERIA); + // } - @Test - void shouldMapEingang() { - service.routeForwarding(GrpcRouteForwardingRequestTestFactory.create(), responseObserver); + // @Test + // void shouldMapEingang() { + // service.routeForwarding(GrpcRouteForwardingRequestTestFactory.create(), + // responseObserver); - verify(eingangMapper).toFormData(GrpcRouteForwardingRequestTestFactory.EINGANG); - } + // verify(eingangMapper).toFormData(GrpcRouteForwardingRequestTestFactory.EINGANG); + // } - @Test - void shouldCallService() { - service.routeForwarding(GrpcRouteForwardingRequestTestFactory.create(), responseObserver); + // @Test + // void shouldCallService() { + // service.routeForwarding(GrpcRouteForwardingRequestTestFactory.create(), + // responseObserver); - verify(routeService).route(routeCriteria, formData); - } + // verify(routeService).route(routeCriteria, formData); + // } } } diff --git a/pom.xml b/pom.xml index 4eb2c2578..82d197342 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>de.ozgcloud.common</groupId> <artifactId>ozgcloud-common-parent</artifactId> - <version>4.9.0</version> + <version>4.11.0</version> <relativePath /> <!-- lookup parent from repository --> </parent> @@ -45,6 +45,7 @@ <modules> <module>common</module> + <module>eingang-manager-interface</module> <module>router</module> <module>forwarder</module> <module>semantik-adapter</module> @@ -54,7 +55,7 @@ </modules> <properties> - <vorgang-manager.version>2.10.0</vorgang-manager.version> + <vorgang-manager.version>2.24.0-OZG-7573-forwarding-interface-SNAPSHOT</vorgang-manager.version> <zufi-manager.version>1.7.0</zufi-manager.version> <jsoup.version>1.14.3</jsoup.version> @@ -75,6 +76,11 @@ <artifactId>common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>de.ozgcloud.eingang</groupId> + <artifactId>eingang-manager-interface</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>de.ozgcloud.eingang</groupId> <artifactId>router</artifactId> diff --git a/router/pom.xml b/router/pom.xml index 5964dfb91..95e980d63 100644 --- a/router/pom.xml +++ b/router/pom.xml @@ -73,6 +73,11 @@ <artifactId>commons-lang3</artifactId> </dependency> + <dependency> + <groupId>org.mapstruct</groupId> + <artifactId>mapstruct</artifactId> + </dependency> + <!-- Dev --> <dependency> <groupId>org.projectlombok</groupId> -- GitLab