Skip to content
Snippets Groups Projects
Commit 6352b83c authored by Felix Reichenbach's avatar Felix Reichenbach
Browse files

OZG-7573 refactor EingangStubReceiverStreamObserver

parent aee93170
No related branches found
No related tags found
1 merge request!9Ozg 7573 forward vorgang
...@@ -28,6 +28,7 @@ import de.ozgcloud.eingang.forwarding.GrpcFileContent; ...@@ -28,6 +28,7 @@ import de.ozgcloud.eingang.forwarding.GrpcFileContent;
import de.ozgcloud.eingang.forwarding.GrpcRepresentation; import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.Builder; import lombok.Builder;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
...@@ -42,16 +43,18 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou ...@@ -42,16 +43,18 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
private final IncomingFileGroupMapper incomingFileGroupMapper; private final IncomingFileGroupMapper incomingFileGroupMapper;
private final Function<InputStream, CompletableFuture<File>> fileSaver; private final Function<InputStream, CompletableFuture<File>> fileSaver;
private final Consumer<FormData> formDataConsumer; private final Consumer<FormData> formDataConsumer;
private final Consumer<GrpcRouteForwardingResponse> responseConsumer;
@Builder @Builder
public EingangStubReceiverStreamObserver(RouteForwardingMapper routeForwardingMapper, IncomingFileMapper incomingFileMapper, public EingangStubReceiverStreamObserver(RouteForwardingMapper routeForwardingMapper, IncomingFileMapper incomingFileMapper,
IncomingFileGroupMapper incomingFileGroupMapper, Function<InputStream, CompletableFuture<File>> fileSaver, IncomingFileGroupMapper incomingFileGroupMapper, Function<InputStream, CompletableFuture<File>> fileSaver,
Consumer<FormData> formDataConsumer) { Consumer<FormData> formDataConsumer, Consumer<GrpcRouteForwardingResponse> responseConsumer) {
this.routeForwardingMapper = routeForwardingMapper; this.routeForwardingMapper = routeForwardingMapper;
this.incomingFileMapper = incomingFileMapper; this.incomingFileMapper = incomingFileMapper;
this.incomingFileGroupMapper = incomingFileGroupMapper; this.incomingFileGroupMapper = incomingFileGroupMapper;
this.fileSaver = fileSaver; this.fileSaver = fileSaver;
this.formDataConsumer = formDataConsumer; this.formDataConsumer = formDataConsumer;
this.responseConsumer = responseConsumer;
} }
private FormData formData; private FormData formData;
...@@ -89,10 +92,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou ...@@ -89,10 +92,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
setCurrentMetadata(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile())); setCurrentMetadata(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile()));
groupName = Optional.of(attachment.getFile().getGroupName()); groupName = Optional.of(attachment.getFile().getGroupName());
} else { } else {
if (Objects.isNull(receivingFileContent)) { handleFileContent(attachment.getContent());
initContentReceiving();
}
storeFileContent(attachment.getContent());
} }
} }
...@@ -100,10 +100,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou ...@@ -100,10 +100,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
if (representation.hasFile()) { if (representation.hasFile()) {
setCurrentMetadata(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile())); setCurrentMetadata(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile()));
} else { } else {
if (Objects.isNull(receivingFileContent)) { handleFileContent(representation.getContent());
initContentReceiving();
}
storeFileContent(representation.getContent());
} }
} }
...@@ -115,6 +112,13 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou ...@@ -115,6 +112,13 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
currentFile = metaData; currentFile = metaData;
} }
private void handleFileContent(GrpcFileContent fileContent) {
if (Objects.isNull(receivingFileContent)) {
initContentReceiving();
}
storeFileContent(fileContent);
}
private void initContentReceiving() { private void initContentReceiving() {
try { try {
pipedInput = new PipedInputStream(CHUNK_SIZE); pipedInput = new PipedInputStream(CHUNK_SIZE);
...@@ -185,6 +189,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou ...@@ -185,6 +189,7 @@ public class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRou
@Override @Override
public void onCompleted() { public void onCompleted() {
formDataConsumer.accept(assembleFormData()); formDataConsumer.accept(assembleFormData());
responseConsumer.accept(GrpcRouteForwardingResponse.getDefaultInstance());
} }
private FormData assembleFormData() { private FormData assembleFormData() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment