Skip to content
Snippets Groups Projects
Commit f643de35 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 Try to register onReadyHandler (not possible)

parent 0c57eb4e
Branches
Tags
1 merge request!27OZG-7573 Dateien Weiterleiten
......@@ -36,6 +36,7 @@ import org.springframework.stereotype.Service;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.binaryfile.BinaryFileUploadStreamObserver;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
......@@ -53,6 +54,7 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import de.ozgcloud.vorgang.vorgang.VorgangService;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import net.devh.boot.grpc.client.inject.GrpcClient;
......@@ -76,7 +78,7 @@ class ForwardingRemoteService {
}
void routeForwarding(ForwardingRequest request, ForwardingResponseObserver responseObserver) {
var requestStreamObserver = serviceStub.withInterceptors(new VorgangManagerClientCallContextAttachingInterceptor())
var requestStreamObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors(new VorgangManagerClientCallContextAttachingInterceptor())
.routeForwarding(responseObserver);
try {
sendEingang(request, requestStreamObserver);
......@@ -87,7 +89,7 @@ class ForwardingRemoteService {
}
}
void sendEingang(ForwardingRequest request, StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
void sendEingang(ForwardingRequest request, ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst();
requestStreamObserver.onNext(buildRouteForwardingRequest(request, eingang));
sendAttachments(eingang.getAttachments(), requestStreamObserver);
......@@ -99,20 +101,21 @@ class ForwardingRemoteService {
return GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(routeForwarding).build();
}
void sendAttachments(List<IncomingFileGroup> attachments, StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
void sendAttachments(List<IncomingFileGroup> attachments, ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
for (var attachment : attachments) {
var groupName = attachment.getName();
attachment.getFiles().forEach(file -> sendAttachmentFile(requestStreamObserver, groupName, file));
}
}
private void sendAttachmentFile(StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) {
private void sendAttachmentFile(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send();
var sender = createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send();
waitForCompletion(sender.getResultFuture());
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file, InputStream fileContentStream) {
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file, InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildAttachmentChunk, requestStreamObserver, fileContentStream)
.withMetaData(buildGrpcAttachmentFile(groupName, file));
}
......@@ -133,29 +136,34 @@ class ForwardingRemoteService {
.build();
}
void sendRepresentations(List<IncomingFile> representations, StreamObserver<GrpcRouteForwardingRequest> requestObserver) {
void sendRepresentations(List<IncomingFile> representations, ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver) {
representations.forEach(representation -> {
var fileContentStream = fileService.getUploadedFileStream(representation.getId());
createRepresentationFileSender(requestObserver, representation, fileContentStream).send();
var sender = createRepresentationFileSender(requestObserver, representation, fileContentStream).send();
waitForCompletion(sender.getResultFuture());
});
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, IncomingFile file, InputStream fileContentStream) {
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, IncomingFile file, InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildRepresentationChunk, requestStreamObserver, fileContentStream)
.withMetaData(buildGrpcRepresentationFile(file));
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder,
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, InputStream fileContentStream) {
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, InputStream fileContentStream) {
return GrpcFileUploadUtils
.createSender(chunkBuilder, fileContentStream, requestCallStreamObserverProvider(requestStreamObserver), false);
}
private Function<StreamObserver<GrpcRouteForwardingResponse>, CallStreamObserver<GrpcRouteForwardingRequest>> requestCallStreamObserverProvider(
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
return response -> (CallStreamObserver<GrpcRouteForwardingRequest>) requestStreamObserver;
ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
// responseObserver should be passed to GrpcService used to transfer files, otherwise onNext()-method won't be called
return response -> {
((BinaryFileUploadStreamObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse>) response).beforeStart(requestStreamObserver);
return (CallStreamObserver<GrpcRouteForwardingRequest>) requestStreamObserver;
};
}
GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) {
......@@ -184,7 +192,7 @@ class ForwardingRemoteService {
.build();
}
void waitForCompletion(CompletableFuture<Void> responseFuture) {
<T> void waitForCompletion(CompletableFuture<T> responseFuture) {
try {
responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment