diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java index 13044c3f530f580dd2f7445086351bb04de4cdff..202ab2ec4357961c7d542bdae613eba26d35a4fc 100644 --- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java +++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java @@ -28,6 +28,7 @@ import de.ozgcloud.eingang.forwarding.GrpcFileContent; import de.ozgcloud.eingang.forwarding.GrpcRepresentation; import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; import io.grpc.stub.StreamObserver; import lombok.Builder; import lombok.extern.log4j.Log4j2; @@ -42,16 +43,18 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou private final IncomingFileGroupMapper incomingFileGroupMapper; private final Function<InputStream, CompletableFuture<File>> fileSaver; private final Consumer<FormData> formDataConsumer; + private final Consumer<GrpcRouteForwardingResponse> responseConsumer; @Builder public EingangStubReceiverStreamObserver(RouteForwardingMapper routeForwardingMapper, IncomingFileMapper incomingFileMapper, IncomingFileGroupMapper incomingFileGroupMapper, Function<InputStream, CompletableFuture<File>> fileSaver, - Consumer<FormData> formDataConsumer) { + Consumer<FormData> formDataConsumer, Consumer<GrpcRouteForwardingResponse> responseConsumer) { this.routeForwardingMapper = routeForwardingMapper; this.incomingFileMapper = incomingFileMapper; this.incomingFileGroupMapper = incomingFileGroupMapper; this.fileSaver = fileSaver; this.formDataConsumer = formDataConsumer; + this.responseConsumer = responseConsumer; } private FormData formData; @@ -89,10 +92,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou setCurrentMetadata(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile())); groupName = Optional.of(attachment.getFile().getGroupName()); } else { - if (Objects.isNull(receivingFileContent)) { - initContentReceiving(); - } - storeFileContent(attachment.getContent()); + handleFileContent(attachment.getContent()); } } @@ -100,10 +100,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou if (representation.hasFile()) { setCurrentMetadata(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile())); } else { - if (Objects.isNull(receivingFileContent)) { - initContentReceiving(); - } - storeFileContent(representation.getContent()); + handleFileContent(representation.getContent()); } } @@ -115,6 +112,13 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou currentFile = metaData; } + private void handleFileContent(GrpcFileContent fileContent) { + if (Objects.isNull(receivingFileContent)) { + initContentReceiving(); + } + storeFileContent(fileContent); + } + private void initContentReceiving() { try { pipedInput = new PipedInputStream(CHUNK_SIZE); @@ -185,6 +189,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou @Override public void onCompleted() { formDataConsumer.accept(assembleFormData()); + responseConsumer.accept(GrpcRouteForwardingResponse.getDefaultInstance()); } private FormData assembleFormData() {