Skip to content
Snippets Groups Projects

OZG-7573 Dateien Weiterleiten

Merged Krzysztof Witukiewicz requested to merge OZG-7573-files-weiterleitung-bug into main
Files
6
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.vorgang.vorgang.redirect;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils;
import de.ozgcloud.common.binaryfile.StreamingFileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.eingang.forwarding.GrpcAttachment;
import de.ozgcloud.eingang.forwarding.GrpcFileContent;
import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor;
import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.IncomingFile;
import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import net.devh.boot.grpc.client.inject.GrpcClient;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor
@Log4j2
class EingangForwarder {
static final int TIMEOUT_MINUTES = 2;
@GrpcClient("forwarder")
private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub;
private final FileService fileService;
private final IncomingFileMapper incomingFileMapper;
private ForwardingResponseObserver responseObserver;
private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver;
public void forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) {
var future = performGrpcCall();
sendEingang(grpcRouteForwarding, attachments, representations);
requestObserver.onCompleted();
waitForCompletion(future);
}
private void sendEingang(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) {
sendRouteForwarding(grpcRouteForwarding);
sendAttachments(attachments);
sendRepresentations(representations);
}
Future<GrpcRouteForwardingResponse> performGrpcCall() {
var responseFuture = new CompletableFuture<GrpcRouteForwardingResponse>();
responseObserver = new ForwardingResponseObserver(responseFuture);
requestObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors(
new VorgangManagerClientCallContextAttachingInterceptor())
.routeForwarding(responseObserver);
return responseFuture;
}
void sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) {
var future = new CompletableFuture<Void>();
responseObserver.registerOnReadyHandler(getSendRouteForwardingRunnable(grpcRouteForwarding, future));
waitForCompletion(future);
}
Runnable getSendRouteForwardingRunnable(GrpcRouteForwarding grpcRouteForwarding, CompletableFuture<Void> future) {
var executed = new AtomicBoolean();
return () -> {
if (!executed.compareAndExchange(false, true)) {
requestObserver.onNext(GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(grpcRouteForwarding).build());
future.complete(null);
}
};
}
void sendAttachments(List<IncomingFileGroup> attachments) {
attachments.stream()
.flatMap(attachment -> {
var groupName = attachment.getName();
return attachment.getFiles().stream().map(file -> new FileInGroup(groupName, file));
})
.forEach(this::sendAttachmentFile);
}
void sendAttachmentFile(FileInGroup fileInGroup) {
var fileContentStream = fileService.getUploadedFileStream(fileInGroup.file.getId());
var fileSender = createAttachmentFileSender(fileInGroup.groupName, fileInGroup.file, fileContentStream).send();
waitForCompletion(fileSender, fileContentStream);
}
record FileInGroup(String groupName, IncomingFile file) {
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName,
IncomingFile file,
InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildAttachmentChunk, fileContentStream).withMetaData(buildGrpcAttachmentFile(groupName, file));
}
GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setAttachment(GrpcAttachment.newBuilder()
.setFile(incomingFileMapper.toAttachmentFile(name, file))
.build())
.build();
}
void sendRepresentations(List<IncomingFile> representations) {
representations.forEach(this::sendRepresentationFile);
}
void sendRepresentationFile(IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var fileSender = createRepresentationFileSender(file, fileContentStream).send();
waitForCompletion(fileSender, fileContentStream);
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file,
InputStream fileContentStream) {
return createSenderWithoutMetadata(this::buildRepresentationChunk, fileContentStream).withMetaData(buildGrpcRepresentationFile(file));
}
GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setContent(buildGrpcFileContent(chunk, length))
.build())
.build();
}
GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) {
return GrpcRouteForwardingRequest.newBuilder()
.setRepresentation(GrpcRepresentation.newBuilder()
.setFile(incomingFileMapper.toRepresentationFile(file))
.build())
.build();
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
var fileContentBuilder = GrpcFileContent.newBuilder();
if (length <= 0) {
fileContentBuilder.setIsEndOfFile(true);
} else {
fileContentBuilder.setContent(ByteString.copyFrom(chunk));
}
return fileContentBuilder.build();
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder, InputStream fileContentStream) {
return GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, fileContentStream, requestObserver,
responseObserver::registerOnReadyHandler);
}
<T> void waitForCompletion(Future<T> responseFuture) {
try {
responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TechnicalException("Waiting for finishing file upload was interrupted.", e);
} catch (ExecutionException e) {
throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) {
throw new TechnicalException("Timeout on uploading file content.", e);
}
}
void waitForCompletion(StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender, InputStream fileContentStream) {
try {
fileSender.getResultFuture().get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fileSender.cancelOnError(e);
throw new TechnicalException("Waiting for finishing upload was interrupted.", e);
} catch (ExecutionException e) {
fileSender.cancelOnError(e);
throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) {
fileSender.cancelOnTimeout();
throw new TechnicalException("Timeout on uploading data.", e);
} finally {
IOUtils.closeQuietly(fileContentStream);
}
}
@RequiredArgsConstructor
static class ForwardingResponseObserver implements ClientResponseObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> {
private final CompletableFuture<GrpcRouteForwardingResponse> future;
private DelegatingOnReadyHandler onReadyHandler;
private GrpcRouteForwardingResponse response;
@Override
public void beforeStart(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream) {
onReadyHandler = new DelegatingOnReadyHandler(requestStream);
requestStream.setOnReadyHandler(onReadyHandler);
}
@Override
public void onNext(GrpcRouteForwardingResponse response) {
this.response = response;
}
@Override
public void onError(Throwable t) {
onReadyHandler.stop();
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
onReadyHandler.stop();
future.complete(response);
}
public void registerOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler.setDelegate(onReadyHandler);
}
}
@RequiredArgsConstructor
static class DelegatingOnReadyHandler implements Runnable {
private final ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream;
private final AtomicReference<Runnable> delegateRef = new AtomicReference<>();
private final AtomicBoolean done = new AtomicBoolean(false);
public void setDelegate(Runnable onReadyHandler) {
this.delegateRef.set(onReadyHandler);
}
public void stop() {
done.set(true);
}
@Override
public void run() {
while (!done.get() && requestStream.isReady()) {
if (Thread.currentThread().isInterrupted()) {
break;
}
var delegate = delegateRef.get();
if (delegate != null) {
delegate.run();
}
}
}
}
}
Loading