Skip to content
Snippets Groups Projects
Commit 9d6f4703 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 Rewrite ForwardingRemoteService

parent f643de35
No related branches found
No related tags found
1 merge request!27OZG-7573 Dateien Weiterleiten
package de.ozgcloud.vorgang.vorgang.redirect;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.eingang.forwarding.GrpcAttachment;
import de.ozgcloud.eingang.forwarding.GrpcFileContent;
import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor;
import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.IncomingFile;
import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
class EingangForwarder {
private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub;
private final FileService fileService;
private final IncomingFileMapper incomingFileMapper;
private ForwardingResponseObserver responseObserver;
private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver;
public CompletableFuture<Void> forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments,
List<IncomingFile> representations) {
return CompletableFuture.allOf(
callService(),
sendRouteForwarding(grpcRouteForwarding)
.thenCompose(ignored -> sendAttachments(attachments))
.thenCompose(ignored -> sendRepresentations(representations))
.whenComplete((result, ex) -> {
if (ex != null) {
responseObserver.onError(ex);
} else {
responseObserver.onCompleted();
}
})
);
}
CompletableFuture<GrpcRouteForwardingResponse> callService() {
CompletableFuture<GrpcRouteForwardingResponse> responseFuture = new CompletableFuture<>();
responseObserver = new ForwardingResponseObserver(responseFuture);
requestObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors(
new VorgangManagerClientCallContextAttachingInterceptor())
.routeForwarding(responseObserver);
return responseFuture;
}
CompletableFuture<GrpcRouteForwardingResponse> sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) {
CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>();
responseObserver.registerOnReadyHandler(() -> {
requestObserver.onNext(GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(grpcRouteForwarding).build());
future.complete(GrpcRouteForwardingResponse.newBuilder().build());
});
return future;
}
CompletableFuture<GrpcRouteForwardingResponse> sendAttachments(List<IncomingFileGroup> attachments) {
return attachments.stream()
.flatMap(attachment -> {
var groupName = attachment.getName();
return attachment.getFiles().stream().map(file -> getSendAttachmentFileFunction(groupName, file));
})
.reduce(
CompletableFuture.completedFuture(GrpcRouteForwardingResponse.newBuilder().build()),
CompletableFuture::thenCompose,
(f1, f2) -> f1.thenCompose(ignored -> f2)
);
}
private Function<GrpcRouteForwardingResponse, CompletableFuture<GrpcRouteForwardingResponse>> getSendAttachmentFileFunction(String groupName,
IncomingFile file) {
return ignored -> sendAttachmentFile(groupName, file);
}
private CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile(String groupName, IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createAttachmentFileSender(groupName, file, fileContentStream).send(responseObserver::registerOnReadyHandler);
return sender.getResultFuture();
}
GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName,
IncomingFile file,
InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildAttachmentChunk, fileContentStream)
.withMetaData(buildGrpcAttachmentFile(groupName, file));
}
GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setFile(incomingFileMapper.toAttachmentFile(name, file))
.build())
.build();
}
CompletableFuture<GrpcRouteForwardingResponse> sendRepresentations(List<IncomingFile> representations) {
return representations.stream()
.map(this::getSendRepresentationFileFunction)
.reduce(
CompletableFuture.completedFuture(GrpcRouteForwardingResponse.newBuilder().build()),
CompletableFuture::thenCompose,
(f1, f2) -> f1.thenCompose(ignored -> f2)
);
}
private Function<GrpcRouteForwardingResponse, CompletableFuture<GrpcRouteForwardingResponse>> getSendRepresentationFileFunction(IncomingFile file) {
return ignored -> sendRepresentationFile(file);
}
private CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile(IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createRepresentationFileSender(file, fileContentStream).send(responseObserver::registerOnReadyHandler);
return sender.getResultFuture();
}
GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file,
InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildRepresentationChunk, fileContentStream).withMetaData(buildGrpcRepresentationFile(file));
}
GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setFile(incomingFileMapper.toRepresentationFile(file))
.build())
.build();
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
var fileContentBuilder = GrpcFileContent.newBuilder();
if (length <= 0) {
fileContentBuilder.setIsEndOfFile(true);
} else {
fileContentBuilder.setContent(ByteString.copyFrom(chunk));
}
return fileContentBuilder.build();
}
GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder, InputStream fileContentStream) {
return GrpcFileUploadUtils.createSender(chunkBuilder, fileContentStream, response -> requestObserver, false);
}
@RequiredArgsConstructor
static class ForwardingResponseObserver implements ClientResponseObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> {
private final CompletableFuture<GrpcRouteForwardingResponse> future;
private DelegatingOnReadyHandler onReadyHandler;
private GrpcRouteForwardingResponse response;
@Override
public void beforeStart(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream) {
onReadyHandler = new DelegatingOnReadyHandler(requestStream);
requestStream.setOnReadyHandler(onReadyHandler);
}
@Override
public void onNext(GrpcRouteForwardingResponse response) {
this.response = response;
}
@Override
public void onError(Throwable t) {
onReadyHandler.stop();
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
onReadyHandler.stop();
future.complete(response);
}
public void registerOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler.setDelegate(onReadyHandler);
}
}
@RequiredArgsConstructor
static class DelegatingOnReadyHandler implements Runnable {
private final ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream;
private final AtomicReference<Runnable> onReadyHandler = new AtomicReference<>();
private final AtomicBoolean done = new AtomicBoolean(false);
public void setDelegate(Runnable onReadyHandler) {
this.onReadyHandler.set(onReadyHandler);
}
public void stop() {
done.set(true);
}
@Override
public void run() {
while (!done.get() && requestStream.isReady()) {
var runnable = onReadyHandler.get();
if (runnable != null) {
runnable.run();
}
}
}
}
}
...@@ -23,39 +23,18 @@ ...@@ -23,39 +23,18 @@
*/ */
package de.ozgcloud.vorgang.vorgang.redirect; package de.ozgcloud.vorgang.vorgang.redirect;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.binaryfile.BinaryFileUploadStreamObserver;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.eingang.forwarding.GrpcAttachment;
import de.ozgcloud.eingang.forwarding.GrpcFileContent;
import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor;
import de.ozgcloud.vorgang.files.FileService; import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.Eingang;
import de.ozgcloud.vorgang.vorgang.IncomingFile;
import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import de.ozgcloud.vorgang.vorgang.VorgangService; import de.ozgcloud.vorgang.vorgang.VorgangService;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import net.devh.boot.grpc.client.inject.GrpcClient; import net.devh.boot.grpc.client.inject.GrpcClient;
...@@ -63,7 +42,7 @@ import net.devh.boot.grpc.client.inject.GrpcClient; ...@@ -63,7 +42,7 @@ import net.devh.boot.grpc.client.inject.GrpcClient;
@RequiredArgsConstructor @RequiredArgsConstructor
class ForwardingRemoteService { class ForwardingRemoteService {
private static final int TIMEOUT_MINUTES = 2; private static final int TIMEOUT_MINUTES = 10;
private final VorgangService vorgangService; private final VorgangService vorgangService;
private final ForwardingRequestMapper forwardingRequestMapper; private final ForwardingRequestMapper forwardingRequestMapper;
@GrpcClient("forwarder") @GrpcClient("forwarder")
...@@ -72,124 +51,10 @@ class ForwardingRemoteService { ...@@ -72,124 +51,10 @@ class ForwardingRemoteService {
private final IncomingFileMapper incomingFileMapper; private final IncomingFileMapper incomingFileMapper;
public void forward(ForwardingRequest request) { public void forward(ForwardingRequest request) {
CompletableFuture<Void> responseFuture = new CompletableFuture<>();
routeForwarding(request, new ForwardingResponseObserver(responseFuture));
waitForCompletion(responseFuture);
}
void routeForwarding(ForwardingRequest request, ForwardingResponseObserver responseObserver) {
var requestStreamObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors(new VorgangManagerClientCallContextAttachingInterceptor())
.routeForwarding(responseObserver);
try {
sendEingang(request, requestStreamObserver);
requestStreamObserver.onCompleted();
} catch (Exception e) {
requestStreamObserver.onError(e);
throw e;
}
}
void sendEingang(ForwardingRequest request, ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst(); var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst();
requestStreamObserver.onNext(buildRouteForwardingRequest(request, eingang)); var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang);
sendAttachments(eingang.getAttachments(), requestStreamObserver); var responseFuture = new EingangForwarder(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(), eingang.getRepresentations());
sendRepresentations(eingang.getRepresentations(), requestStreamObserver); waitForCompletion(responseFuture);
}
GrpcRouteForwardingRequest buildRouteForwardingRequest(ForwardingRequest request, Eingang eingang) {
var routeForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang);
return GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(routeForwarding).build();
}
void sendAttachments(List<IncomingFileGroup> attachments, ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
for (var attachment : attachments) {
var groupName = attachment.getName();
attachment.getFiles().forEach(file -> sendAttachmentFile(requestStreamObserver, groupName, file));
}
}
private void sendAttachmentFile(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send();
waitForCompletion(sender.getResultFuture());
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file, InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildAttachmentChunk, requestStreamObserver, fileContentStream)
.withMetaData(buildGrpcAttachmentFile(groupName, file));
}
GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setFile(incomingFileMapper.toAttachmentFile(name, file))
.build())
.build();
}
void sendRepresentations(List<IncomingFile> representations, ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver) {
representations.forEach(representation -> {
var fileContentStream = fileService.getUploadedFileStream(representation.getId());
var sender = createRepresentationFileSender(requestObserver, representation, fileContentStream).send();
waitForCompletion(sender.getResultFuture());
});
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, IncomingFile file, InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildRepresentationChunk, requestStreamObserver, fileContentStream)
.withMetaData(buildGrpcRepresentationFile(file));
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder,
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, InputStream fileContentStream) {
return GrpcFileUploadUtils
.createSender(chunkBuilder, fileContentStream, requestCallStreamObserverProvider(requestStreamObserver), false);
}
private Function<StreamObserver<GrpcRouteForwardingResponse>, CallStreamObserver<GrpcRouteForwardingRequest>> requestCallStreamObserverProvider(
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
// responseObserver should be passed to GrpcService used to transfer files, otherwise onNext()-method won't be called
return response -> {
((BinaryFileUploadStreamObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse>) response).beforeStart(requestStreamObserver);
return (CallStreamObserver<GrpcRouteForwardingRequest>) requestStreamObserver;
};
}
GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
var fileContentBuilder = GrpcFileContent.newBuilder();
if (length <= 0) {
fileContentBuilder.setIsEndOfFile(true);
} else {
fileContentBuilder.setContent(ByteString.copyFrom(chunk));
}
return fileContentBuilder.build();
}
GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setFile(incomingFileMapper.toRepresentationFile(file))
.build())
.build();
} }
<T> void waitForCompletion(CompletableFuture<T> responseFuture) { <T> void waitForCompletion(CompletableFuture<T> responseFuture) {
...@@ -204,24 +69,4 @@ class ForwardingRemoteService { ...@@ -204,24 +69,4 @@ class ForwardingRemoteService {
throw new TechnicalException("Timeout on uploading file content.", e); throw new TechnicalException("Timeout on uploading file content.", e);
} }
} }
@RequiredArgsConstructor
static class ForwardingResponseObserver implements StreamObserver<GrpcRouteForwardingResponse> {
private final CompletableFuture<Void> future;
@Override
public void onNext(GrpcRouteForwardingResponse value) {
// noop
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
future.complete(null);
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment