From 6352b83cdfee7fe1972344e1dfe236e511800016 Mon Sep 17 00:00:00 2001 From: Felix Reichenbach <felix.reichenbach@mgm-tp.com> Date: Mon, 17 Mar 2025 10:32:43 +0100 Subject: [PATCH] OZG-7573 refactor EingangStubReceiverStreamObserver --- .../EingangStubReceiverStreamObserver.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java index 13044c3f5..202ab2ec4 100644 --- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java @@ -28,6 +28,7 @@ import de.ozgcloud.eingang.forwarding.GrpcFileContent; import de.ozgcloud.eingang.forwarding.GrpcRepresentation; import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; import io.grpc.stub.StreamObserver; import lombok.Builder; import lombok.extern.log4j.Log4j2; @@ -42,16 +43,18 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou private final IncomingFileGroupMapper incomingFileGroupMapper; private final Function<InputStream, CompletableFuture<File>> fileSaver; private final Consumer<FormData> formDataConsumer; + private final Consumer<GrpcRouteForwardingResponse> responseConsumer; @Builder public EingangStubReceiverStreamObserver(RouteForwardingMapper routeForwardingMapper, IncomingFileMapper incomingFileMapper, IncomingFileGroupMapper incomingFileGroupMapper, Function<InputStream, CompletableFuture<File>> fileSaver, - Consumer<FormData> formDataConsumer) { + Consumer<FormData> formDataConsumer, Consumer<GrpcRouteForwardingResponse> responseConsumer) { this.routeForwardingMapper = routeForwardingMapper; this.incomingFileMapper = incomingFileMapper; this.incomingFileGroupMapper = incomingFileGroupMapper; this.fileSaver = fileSaver; this.formDataConsumer = formDataConsumer; + this.responseConsumer = responseConsumer; } private FormData formData; @@ -89,10 +92,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou setCurrentMetadata(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile())); groupName = Optional.of(attachment.getFile().getGroupName()); } else { - if (Objects.isNull(receivingFileContent)) { - initContentReceiving(); - } - storeFileContent(attachment.getContent()); + handleFileContent(attachment.getContent()); } } @@ -100,10 +100,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou if (representation.hasFile()) { setCurrentMetadata(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile())); } else { - if (Objects.isNull(receivingFileContent)) { - initContentReceiving(); - } - storeFileContent(representation.getContent()); + handleFileContent(representation.getContent()); } } @@ -115,6 +112,13 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou currentFile = metaData; } + private void handleFileContent(GrpcFileContent fileContent) { + if (Objects.isNull(receivingFileContent)) { + initContentReceiving(); + } + storeFileContent(fileContent); + } + private void initContentReceiving() { try { pipedInput = new PipedInputStream(CHUNK_SIZE); @@ -185,6 +189,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou @Override public void onCompleted() { formDataConsumer.accept(assembleFormData()); + responseConsumer.accept(GrpcRouteForwardingResponse.getDefaultInstance()); } private FormData assembleFormData() { -- GitLab