Skip to content
Snippets Groups Projects
Commit ea12a026 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

Merge branch 'OZG-7573-files-weiterleitung-bug' into 'main'

OZG-7573 Dateien Weiterleiten

See merge request !27
parents 4de616e9 4c263c57
No related branches found
No related tags found
1 merge request!27OZG-7573 Dateien Weiterleiten
...@@ -51,7 +51,7 @@ ...@@ -51,7 +51,7 @@
<spring-boot.build-image.imageName>docker.ozg-sh.de/vorgang-manager:build-latest</spring-boot.build-image.imageName> <spring-boot.build-image.imageName>docker.ozg-sh.de/vorgang-manager:build-latest</spring-boot.build-image.imageName>
<zufi-manager-interface.version>1.6.0</zufi-manager-interface.version> <zufi-manager-interface.version>1.6.0</zufi-manager-interface.version>
<common-lib.version>4.12.0</common-lib.version> <common-lib.version>4.13.0-SNAPSHOT</common-lib.version>
<user-manager-interface.version>2.12.0</user-manager-interface.version> <user-manager-interface.version>2.12.0</user-manager-interface.version>
<processor-manager.version>0.5.0</processor-manager.version> <processor-manager.version>0.5.0</processor-manager.version>
<nachrichten-manager.version>2.19.0</nachrichten-manager.version> <nachrichten-manager.version>2.19.0</nachrichten-manager.version>
......
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.vorgang.vorgang.redirect;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils;
import de.ozgcloud.common.binaryfile.StreamingFileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.eingang.forwarding.GrpcAttachment;
import de.ozgcloud.eingang.forwarding.GrpcFileContent;
import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor;
import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.IncomingFile;
import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import net.devh.boot.grpc.client.inject.GrpcClient;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor
@Log4j2
class EingangForwarder {
static final int TIMEOUT_MINUTES = 2;
@GrpcClient("forwarder")
private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub;
private final FileService fileService;
private final IncomingFileMapper incomingFileMapper;
private ForwardingResponseObserver responseObserver;
private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver;
public void forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) {
var future = performGrpcCall();
sendEingang(grpcRouteForwarding, attachments, representations);
requestObserver.onCompleted();
waitForCompletion(future);
}
private void sendEingang(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) {
sendRouteForwarding(grpcRouteForwarding);
sendAttachments(attachments);
sendRepresentations(representations);
}
Future<GrpcRouteForwardingResponse> performGrpcCall() {
var responseFuture = new CompletableFuture<GrpcRouteForwardingResponse>();
responseObserver = new ForwardingResponseObserver(responseFuture);
requestObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors(
new VorgangManagerClientCallContextAttachingInterceptor())
.routeForwarding(responseObserver);
return responseFuture;
}
void sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) {
var future = new CompletableFuture<Void>();
responseObserver.registerOnReadyHandler(getSendRouteForwardingRunnable(grpcRouteForwarding, future));
waitForCompletion(future);
}
Runnable getSendRouteForwardingRunnable(GrpcRouteForwarding grpcRouteForwarding, CompletableFuture<Void> future) {
var executed = new AtomicBoolean();
return () -> {
if (!executed.compareAndExchange(false, true)) {
requestObserver.onNext(GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(grpcRouteForwarding).build());
future.complete(null);
}
};
}
void sendAttachments(List<IncomingFileGroup> attachments) {
attachments.stream()
.flatMap(attachment -> {
var groupName = attachment.getName();
return attachment.getFiles().stream().map(file -> new FileInGroup(groupName, file));
})
.forEach(this::sendAttachmentFile);
}
void sendAttachmentFile(FileInGroup fileInGroup) {
var fileContentStream = fileService.getUploadedFileStream(fileInGroup.file.getId());
var fileSender = createAttachmentFileSender(fileInGroup.groupName, fileInGroup.file, fileContentStream).send();
waitForCompletion(fileSender, fileContentStream);
}
record FileInGroup(String groupName, IncomingFile file) {
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName,
IncomingFile file,
InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildAttachmentChunk, fileContentStream).withMetaData(buildGrpcAttachmentFile(groupName, file));
}
GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setFile(incomingFileMapper.toAttachmentFile(name, file))
.build())
.build();
}
void sendRepresentations(List<IncomingFile> representations) {
representations.forEach(this::sendRepresentationFile);
}
void sendRepresentationFile(IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var fileSender = createRepresentationFileSender(file, fileContentStream).send();
waitForCompletion(fileSender, fileContentStream);
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file,
InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildRepresentationChunk, fileContentStream).withMetaData(buildGrpcRepresentationFile(file));
}
GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setFile(incomingFileMapper.toRepresentationFile(file))
.build())
.build();
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
var fileContentBuilder = GrpcFileContent.newBuilder();
if (length <= 0) {
fileContentBuilder.setIsEndOfFile(true);
} else {
fileContentBuilder.setContent(ByteString.copyFrom(chunk));
}
return fileContentBuilder.build();
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder, InputStream fileContentStream) {
return GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, fileContentStream, requestObserver,
responseObserver::registerOnReadyHandler);
}
<T> void waitForCompletion(Future<T> responseFuture) {
try {
responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TechnicalException("Waiting for finishing file upload was interrupted.", e);
} catch (ExecutionException e) {
throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) {
throw new TechnicalException("Timeout on uploading file content.", e);
}
}
void waitForCompletion(StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender, InputStream fileContentStream) {
try {
fileSender.getResultFuture().get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fileSender.cancelOnError(e);
throw new TechnicalException("Waiting for finishing upload was interrupted.", e);
} catch (ExecutionException e) {
fileSender.cancelOnError(e);
throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) {
fileSender.cancelOnTimeout();
throw new TechnicalException("Timeout on uploading data.", e);
} finally {
IOUtils.closeQuietly(fileContentStream);
}
}
@RequiredArgsConstructor
static class ForwardingResponseObserver implements ClientResponseObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> {
private final CompletableFuture<GrpcRouteForwardingResponse> future;
private DelegatingOnReadyHandler onReadyHandler;
private GrpcRouteForwardingResponse response;
@Override
public void beforeStart(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream) {
onReadyHandler = new DelegatingOnReadyHandler(requestStream);
requestStream.setOnReadyHandler(onReadyHandler);
}
@Override
public void onNext(GrpcRouteForwardingResponse response) {
this.response = response;
}
@Override
public void onError(Throwable t) {
onReadyHandler.stop();
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
onReadyHandler.stop();
future.complete(response);
}
public void registerOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler.setDelegate(onReadyHandler);
}
}
@RequiredArgsConstructor
static class DelegatingOnReadyHandler implements Runnable {
private final ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream;
private final AtomicReference<Runnable> delegateRef = new AtomicReference<>();
private final AtomicBoolean done = new AtomicBoolean(false);
public void setDelegate(Runnable onReadyHandler) {
this.delegateRef.set(onReadyHandler);
}
public void stop() {
done.set(true);
}
@Override
public void run() {
while (!done.get() && requestStream.isReady()) {
if (Thread.currentThread().isInterrupted()) {
break;
}
var delegate = delegateRef.get();
if (delegate != null) {
delegate.run();
}
}
}
}
}
...@@ -23,197 +23,27 @@ ...@@ -23,197 +23,27 @@
*/ */
package de.ozgcloud.vorgang.vorgang.redirect; package de.ozgcloud.vorgang.vorgang.redirect;
import java.io.InputStream; import org.springframework.beans.factory.annotation.Lookup;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.eingang.forwarding.GrpcAttachment;
import de.ozgcloud.eingang.forwarding.GrpcFileContent;
import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor;
import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.Eingang;
import de.ozgcloud.vorgang.vorgang.IncomingFile;
import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import de.ozgcloud.vorgang.vorgang.VorgangService; import de.ozgcloud.vorgang.vorgang.VorgangService;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import net.devh.boot.grpc.client.inject.GrpcClient;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
class ForwardingRemoteService { class ForwardingRemoteService {
private static final int TIMEOUT_MINUTES = 2;
private final VorgangService vorgangService; private final VorgangService vorgangService;
private final ForwardingRequestMapper forwardingRequestMapper; private final ForwardingRequestMapper forwardingRequestMapper;
@GrpcClient("forwarder")
private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub;
private final FileService fileService;
private final IncomingFileMapper incomingFileMapper;
public void forward(ForwardingRequest request) { public void forward(ForwardingRequest request) {
CompletableFuture<Void> responseFuture = new CompletableFuture<>();
routeForwarding(request, new ForwardingResponseObserver(responseFuture));
waitForCompletion(responseFuture);
}
void routeForwarding(ForwardingRequest request, ForwardingResponseObserver responseObserver) {
var requestStreamObserver = serviceStub.withInterceptors(new VorgangManagerClientCallContextAttachingInterceptor())
.routeForwarding(responseObserver);
try {
sendEingang(request, requestStreamObserver);
requestStreamObserver.onCompleted();
} catch (Exception e) {
requestStreamObserver.onError(e);
throw e;
}
}
void sendEingang(ForwardingRequest request, StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst(); var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst();
requestStreamObserver.onNext(buildRouteForwardingRequest(request, eingang)); var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang);
sendAttachments(eingang.getAttachments(), requestStreamObserver); getEingangForwarder().forward(grpcRouteForwarding, eingang.getAttachments(), eingang.getRepresentations());
sendRepresentations(eingang.getRepresentations(), requestStreamObserver);
}
GrpcRouteForwardingRequest buildRouteForwardingRequest(ForwardingRequest request, Eingang eingang) {
var routeForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang);
return GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(routeForwarding).build();
}
void sendAttachments(List<IncomingFileGroup> attachments, StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
for (var attachment : attachments) {
var groupName = attachment.getName();
attachment.getFiles().forEach(file -> sendAttachmentFile(requestStreamObserver, groupName, file));
}
} }
private void sendAttachmentFile(StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) { @Lookup
var fileContentStream = fileService.getUploadedFileStream(file.getId()); EingangForwarder getEingangForwarder() {
createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send(); return null; // provided by Spring
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file, InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildAttachmentChunk, requestStreamObserver, fileContentStream)
.withMetaData(buildGrpcAttachmentFile(groupName, file));
}
GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setFile(incomingFileMapper.toAttachmentFile(name, file))
.build())
.build();
}
void sendRepresentations(List<IncomingFile> representations, StreamObserver<GrpcRouteForwardingRequest> requestObserver) {
representations.forEach(representation -> {
var fileContentStream = fileService.getUploadedFileStream(representation.getId());
createRepresentationFileSender(requestObserver, representation, fileContentStream).send();
});
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, IncomingFile file, InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildRepresentationChunk, requestStreamObserver, fileContentStream)
.withMetaData(buildGrpcRepresentationFile(file));
}
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder,
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, InputStream fileContentStream) {
return GrpcFileUploadUtils
.createSender(chunkBuilder, fileContentStream, requestCallStreamObserverProvider(requestStreamObserver), false);
}
private Function<StreamObserver<GrpcRouteForwardingResponse>, CallStreamObserver<GrpcRouteForwardingRequest>> requestCallStreamObserverProvider(
StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) {
return response -> (CallStreamObserver<GrpcRouteForwardingRequest>) requestStreamObserver;
}
GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
var fileContentBuilder = GrpcFileContent.newBuilder();
if (length <= 0) {
fileContentBuilder.setIsEndOfFile(true);
} else {
fileContentBuilder.setContent(ByteString.copyFrom(chunk));
}
return fileContentBuilder.build();
}
GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setFile(incomingFileMapper.toRepresentationFile(file))
.build())
.build();
}
void waitForCompletion(CompletableFuture<Void> responseFuture) {
try {
responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TechnicalException("Waiting for finishing file upload was interrupted.", e);
} catch (ExecutionException e) {
throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) {
throw new TechnicalException("Timeout on uploading file content.", e);
}
}
@RequiredArgsConstructor
static class ForwardingResponseObserver implements StreamObserver<GrpcRouteForwardingResponse> {
private final CompletableFuture<Void> future;
@Override
public void onNext(GrpcRouteForwardingResponse value) {
// noop
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
future.complete(null);
}
} }
} }
...@@ -23,15 +23,23 @@ ...@@ -23,15 +23,23 @@
*/ */
package de.ozgcloud.vorgang.vorgang; package de.ozgcloud.vorgang.vorgang;
import de.ozgcloud.vorgang.files.FileId;
public class IncomingFileGroupTestFactory { public class IncomingFileGroupTestFactory {
public static final String NAME = GrpcIncomingFileGroupTestFactory.NAME; public static final String NAME = GrpcIncomingFileGroupTestFactory.NAME;
public static final IncomingFile FILE = IncomingFileTestFactory.create(); public static final IncomingFile FILE = IncomingFileTestFactory.create();
public static final IncomingFile FILE2 = IncomingFileTestFactory.createBuilder()
.id(FileId.createNew()).build();
public static IncomingFileGroup create() { public static IncomingFileGroup create() {
return createBuilder().build(); return createBuilder().build();
} }
public static IncomingFileGroup createWithTwoFiles() {
return createBuilder().file(FILE2).build();
}
public static IncomingFileGroup.IncomingFileGroupBuilder createBuilder() { public static IncomingFileGroup.IncomingFileGroupBuilder createBuilder() {
return IncomingFileGroup.builder() return IncomingFileGroup.builder()
.name(NAME) .name(NAME)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment