diff --git a/vorgang-manager-server/pom.xml b/vorgang-manager-server/pom.xml index 8399fdf8c9e72d483b56a615dabb6203cd8654ac..0946d4873e23f7d07e9da0f877e3629787100ca8 100644 --- a/vorgang-manager-server/pom.xml +++ b/vorgang-manager-server/pom.xml @@ -51,7 +51,7 @@ <spring-boot.build-image.imageName>docker.ozg-sh.de/vorgang-manager:build-latest</spring-boot.build-image.imageName> <zufi-manager-interface.version>1.6.0</zufi-manager-interface.version> - <common-lib.version>4.12.0</common-lib.version> + <common-lib.version>4.13.0-SNAPSHOT</common-lib.version> <user-manager-interface.version>2.12.0</user-manager-interface.version> <processor-manager.version>0.5.0</processor-manager.version> <nachrichten-manager.version>2.19.0</nachrichten-manager.version> diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java new file mode 100644 index 0000000000000000000000000000000000000000..20cbfb80bb55043fc45d650886d0d40a771d6176 --- /dev/null +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java @@ -0,0 +1,299 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.vorgang.vorgang.redirect; + +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; + +import org.apache.commons.io.IOUtils; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils; +import de.ozgcloud.common.binaryfile.StreamingFileSender; +import de.ozgcloud.common.errorhandling.TechnicalException; +import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; +import de.ozgcloud.eingang.forwarding.GrpcAttachment; +import de.ozgcloud.eingang.forwarding.GrpcFileContent; +import de.ozgcloud.eingang.forwarding.GrpcRepresentation; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; +import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor; +import de.ozgcloud.vorgang.files.FileService; +import de.ozgcloud.vorgang.vorgang.IncomingFile; +import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; +import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import net.devh.boot.grpc.client.inject.GrpcClient; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@RequiredArgsConstructor +@Log4j2 +class EingangForwarder { + + static final int TIMEOUT_MINUTES = 2; + + @GrpcClient("forwarder") + private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; + private final FileService fileService; + private final IncomingFileMapper incomingFileMapper; + + private ForwardingResponseObserver responseObserver; + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; + + public void forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) { + var future = performGrpcCall(); + sendEingang(grpcRouteForwarding, attachments, representations); + requestObserver.onCompleted(); + waitForCompletion(future); + } + + private void sendEingang(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) { + sendRouteForwarding(grpcRouteForwarding); + sendAttachments(attachments); + sendRepresentations(representations); + } + + Future<GrpcRouteForwardingResponse> performGrpcCall() { + var responseFuture = new CompletableFuture<GrpcRouteForwardingResponse>(); + responseObserver = new ForwardingResponseObserver(responseFuture); + requestObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors( + new VorgangManagerClientCallContextAttachingInterceptor()) + .routeForwarding(responseObserver); + return responseFuture; + } + + void sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) { + var future = new CompletableFuture<Void>(); + responseObserver.registerOnReadyHandler(getSendRouteForwardingRunnable(grpcRouteForwarding, future)); + waitForCompletion(future); + } + + Runnable getSendRouteForwardingRunnable(GrpcRouteForwarding grpcRouteForwarding, CompletableFuture<Void> future) { + var executed = new AtomicBoolean(); + return () -> { + if (!executed.compareAndExchange(false, true)) { + requestObserver.onNext(GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(grpcRouteForwarding).build()); + future.complete(null); + } + }; + } + + void sendAttachments(List<IncomingFileGroup> attachments) { + attachments.stream() + .flatMap(attachment -> { + var groupName = attachment.getName(); + return attachment.getFiles().stream().map(file -> new FileInGroup(groupName, file)); + }) + .forEach(this::sendAttachmentFile); + } + + void sendAttachmentFile(FileInGroup fileInGroup) { + var fileContentStream = fileService.getUploadedFileStream(fileInGroup.file.getId()); + var fileSender = createAttachmentFileSender(fileInGroup.groupName, fileInGroup.file, fileContentStream).send(); + waitForCompletion(fileSender, fileContentStream); + } + + record FileInGroup(String groupName, IncomingFile file) { + } + + StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName, + IncomingFile file, + InputStream fileContentStream) { + return createSenderWithoutMetadata(this::buildAttachmentChunk, fileContentStream).withMetaData(buildGrpcAttachmentFile(groupName, file)); + } + + GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) { + return GrpcRouteForwardingRequest.newBuilder() + .setAttachment(GrpcAttachment.newBuilder() + .setContent(buildGrpcFileContent(chunk, length)) + .build()) + .build(); + } + + GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) { + return GrpcRouteForwardingRequest.newBuilder() + .setAttachment(GrpcAttachment.newBuilder() + .setFile(incomingFileMapper.toAttachmentFile(name, file)) + .build()) + .build(); + } + + void sendRepresentations(List<IncomingFile> representations) { + representations.forEach(this::sendRepresentationFile); + } + + void sendRepresentationFile(IncomingFile file) { + var fileContentStream = fileService.getUploadedFileStream(file.getId()); + var fileSender = createRepresentationFileSender(file, fileContentStream).send(); + waitForCompletion(fileSender, fileContentStream); + } + + StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file, + InputStream fileContentStream) { + return createSenderWithoutMetadata(this::buildRepresentationChunk, fileContentStream).withMetaData(buildGrpcRepresentationFile(file)); + } + + GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) { + return GrpcRouteForwardingRequest.newBuilder() + .setRepresentation(GrpcRepresentation.newBuilder() + .setContent(buildGrpcFileContent(chunk, length)) + .build()) + .build(); + } + + GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) { + return GrpcRouteForwardingRequest.newBuilder() + .setRepresentation(GrpcRepresentation.newBuilder() + .setFile(incomingFileMapper.toRepresentationFile(file)) + .build()) + .build(); + } + + GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) { + var fileContentBuilder = GrpcFileContent.newBuilder(); + if (length <= 0) { + fileContentBuilder.setIsEndOfFile(true); + } else { + fileContentBuilder.setContent(ByteString.copyFrom(chunk)); + } + return fileContentBuilder.build(); + } + + StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata( + BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder, InputStream fileContentStream) { + return GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, fileContentStream, requestObserver, + responseObserver::registerOnReadyHandler); + } + + <T> void waitForCompletion(Future<T> responseFuture) { + try { + responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TechnicalException("Waiting for finishing file upload was interrupted.", e); + } catch (ExecutionException e) { + throw new TechnicalException("Error on uploading file content.", e); + } catch (TimeoutException e) { + throw new TechnicalException("Timeout on uploading file content.", e); + } + } + + void waitForCompletion(StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender, InputStream fileContentStream) { + try { + fileSender.getResultFuture().get(TIMEOUT_MINUTES, TimeUnit.MINUTES); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fileSender.cancelOnError(e); + throw new TechnicalException("Waiting for finishing upload was interrupted.", e); + } catch (ExecutionException e) { + fileSender.cancelOnError(e); + throw new TechnicalException("Error on uploading file content.", e); + } catch (TimeoutException e) { + fileSender.cancelOnTimeout(); + throw new TechnicalException("Timeout on uploading data.", e); + } finally { + IOUtils.closeQuietly(fileContentStream); + } + } + + @RequiredArgsConstructor + static class ForwardingResponseObserver implements ClientResponseObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> { + private final CompletableFuture<GrpcRouteForwardingResponse> future; + private DelegatingOnReadyHandler onReadyHandler; + private GrpcRouteForwardingResponse response; + + @Override + public void beforeStart(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream) { + onReadyHandler = new DelegatingOnReadyHandler(requestStream); + requestStream.setOnReadyHandler(onReadyHandler); + } + + @Override + public void onNext(GrpcRouteForwardingResponse response) { + this.response = response; + } + + @Override + public void onError(Throwable t) { + onReadyHandler.stop(); + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + onReadyHandler.stop(); + future.complete(response); + } + + public void registerOnReadyHandler(Runnable onReadyHandler) { + this.onReadyHandler.setDelegate(onReadyHandler); + } + } + + @RequiredArgsConstructor + static class DelegatingOnReadyHandler implements Runnable { + + private final ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream; + private final AtomicReference<Runnable> delegateRef = new AtomicReference<>(); + private final AtomicBoolean done = new AtomicBoolean(false); + + public void setDelegate(Runnable onReadyHandler) { + this.delegateRef.set(onReadyHandler); + } + + public void stop() { + done.set(true); + } + + @Override + public void run() { + while (!done.get() && requestStream.isReady()) { + if (Thread.currentThread().isInterrupted()) { + break; + } + var delegate = delegateRef.get(); + if (delegate != null) { + delegate.run(); + } + } + } + } +} diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java index 63323fb8ecb106b710574deed664efbc70c3be1f..aac13d6d1df7de71581638c430c085bc68b8ff6b 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java @@ -23,197 +23,27 @@ */ package de.ozgcloud.vorgang.vorgang.redirect; -import java.io.InputStream; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; -import java.util.function.Function; - +import org.springframework.beans.factory.annotation.Lookup; import org.springframework.stereotype.Service; -import com.google.protobuf.ByteString; - -import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils; -import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; -import de.ozgcloud.common.errorhandling.TechnicalException; -import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; -import de.ozgcloud.eingang.forwarding.GrpcAttachment; -import de.ozgcloud.eingang.forwarding.GrpcFileContent; -import de.ozgcloud.eingang.forwarding.GrpcRepresentation; -import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; -import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; -import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor; -import de.ozgcloud.vorgang.files.FileService; -import de.ozgcloud.vorgang.vorgang.Eingang; -import de.ozgcloud.vorgang.vorgang.IncomingFile; -import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; -import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; import de.ozgcloud.vorgang.vorgang.VorgangService; -import io.grpc.stub.CallStreamObserver; -import io.grpc.stub.StreamObserver; import lombok.RequiredArgsConstructor; -import net.devh.boot.grpc.client.inject.GrpcClient; @Service @RequiredArgsConstructor class ForwardingRemoteService { - private static final int TIMEOUT_MINUTES = 2; private final VorgangService vorgangService; private final ForwardingRequestMapper forwardingRequestMapper; - @GrpcClient("forwarder") - private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; - private final FileService fileService; - private final IncomingFileMapper incomingFileMapper; public void forward(ForwardingRequest request) { - CompletableFuture<Void> responseFuture = new CompletableFuture<>(); - routeForwarding(request, new ForwardingResponseObserver(responseFuture)); - waitForCompletion(responseFuture); - } - - void routeForwarding(ForwardingRequest request, ForwardingResponseObserver responseObserver) { - var requestStreamObserver = serviceStub.withInterceptors(new VorgangManagerClientCallContextAttachingInterceptor()) - .routeForwarding(responseObserver); - try { - sendEingang(request, requestStreamObserver); - requestStreamObserver.onCompleted(); - } catch (Exception e) { - requestStreamObserver.onError(e); - throw e; - } - } - - void sendEingang(ForwardingRequest request, StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) { var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst(); - requestStreamObserver.onNext(buildRouteForwardingRequest(request, eingang)); - sendAttachments(eingang.getAttachments(), requestStreamObserver); - sendRepresentations(eingang.getRepresentations(), requestStreamObserver); - } - - GrpcRouteForwardingRequest buildRouteForwardingRequest(ForwardingRequest request, Eingang eingang) { - var routeForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang); - return GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(routeForwarding).build(); - } - - void sendAttachments(List<IncomingFileGroup> attachments, StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) { - for (var attachment : attachments) { - var groupName = attachment.getName(); - attachment.getFiles().forEach(file -> sendAttachmentFile(requestStreamObserver, groupName, file)); - } + var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang); + getEingangForwarder().forward(grpcRouteForwarding, eingang.getAttachments(), eingang.getRepresentations()); } - private void sendAttachmentFile(StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) { - var fileContentStream = fileService.getUploadedFileStream(file.getId()); - createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send(); - } - - FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender( - StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file, InputStream fileContentStream) { - return createSenderWithoutMetadata(this::buildAttachmentChunk, requestStreamObserver, fileContentStream) - .withMetaData(buildGrpcAttachmentFile(groupName, file)); - } - - GrpcRouteForwardingRequest buildAttachmentChunk(byte[] chunk, int length) { - return GrpcRouteForwardingRequest.newBuilder() - .setAttachment(GrpcAttachment.newBuilder() - .setContent(buildGrpcFileContent(chunk, length)) - .build()) - .build(); - } - - GrpcRouteForwardingRequest buildGrpcAttachmentFile(String name, IncomingFile file) { - return GrpcRouteForwardingRequest.newBuilder() - .setAttachment(GrpcAttachment.newBuilder() - .setFile(incomingFileMapper.toAttachmentFile(name, file)) - .build()) - .build(); - } - - void sendRepresentations(List<IncomingFile> representations, StreamObserver<GrpcRouteForwardingRequest> requestObserver) { - representations.forEach(representation -> { - var fileContentStream = fileService.getUploadedFileStream(representation.getId()); - createRepresentationFileSender(requestObserver, representation, fileContentStream).send(); - }); - } - - FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender( - StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, IncomingFile file, InputStream fileContentStream) { - return createSenderWithoutMetadata(this::buildRepresentationChunk, requestStreamObserver, fileContentStream) - .withMetaData(buildGrpcRepresentationFile(file)); - } - - FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata( - BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder, - StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, InputStream fileContentStream) { - return GrpcFileUploadUtils - .createSender(chunkBuilder, fileContentStream, requestCallStreamObserverProvider(requestStreamObserver), false); - } - - private Function<StreamObserver<GrpcRouteForwardingResponse>, CallStreamObserver<GrpcRouteForwardingRequest>> requestCallStreamObserverProvider( - StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver) { - return response -> (CallStreamObserver<GrpcRouteForwardingRequest>) requestStreamObserver; - } - - GrpcRouteForwardingRequest buildRepresentationChunk(byte[] chunk, int length) { - return GrpcRouteForwardingRequest.newBuilder() - .setRepresentation(GrpcRepresentation.newBuilder() - .setContent(buildGrpcFileContent(chunk, length)) - .build()) - .build(); - } - - GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) { - var fileContentBuilder = GrpcFileContent.newBuilder(); - if (length <= 0) { - fileContentBuilder.setIsEndOfFile(true); - } else { - fileContentBuilder.setContent(ByteString.copyFrom(chunk)); - } - return fileContentBuilder.build(); - } - - GrpcRouteForwardingRequest buildGrpcRepresentationFile(IncomingFile file) { - return GrpcRouteForwardingRequest.newBuilder() - .setRepresentation(GrpcRepresentation.newBuilder() - .setFile(incomingFileMapper.toRepresentationFile(file)) - .build()) - .build(); - } - - void waitForCompletion(CompletableFuture<Void> responseFuture) { - try { - responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new TechnicalException("Waiting for finishing file upload was interrupted.", e); - } catch (ExecutionException e) { - throw new TechnicalException("Error on uploading file content.", e); - } catch (TimeoutException e) { - throw new TechnicalException("Timeout on uploading file content.", e); - } - } - - @RequiredArgsConstructor - static class ForwardingResponseObserver implements StreamObserver<GrpcRouteForwardingResponse> { - private final CompletableFuture<Void> future; - - @Override - public void onNext(GrpcRouteForwardingResponse value) { - // noop - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onCompleted() { - future.complete(null); - } + @Lookup + EingangForwarder getEingangForwarder() { + return null; // provided by Spring } } diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/IncomingFileGroupTestFactory.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/IncomingFileGroupTestFactory.java index f95a612361b800020d17bc95b39bcb2e3dae6a57..4075d2c3bd063b6af34b47df2f3ce43f5ee723db 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/IncomingFileGroupTestFactory.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/IncomingFileGroupTestFactory.java @@ -23,15 +23,23 @@ */ package de.ozgcloud.vorgang.vorgang; +import de.ozgcloud.vorgang.files.FileId; + public class IncomingFileGroupTestFactory { public static final String NAME = GrpcIncomingFileGroupTestFactory.NAME; public static final IncomingFile FILE = IncomingFileTestFactory.create(); + public static final IncomingFile FILE2 = IncomingFileTestFactory.createBuilder() + .id(FileId.createNew()).build(); public static IncomingFileGroup create() { return createBuilder().build(); } + public static IncomingFileGroup createWithTwoFiles() { + return createBuilder().file(FILE2).build(); + } + public static IncomingFileGroup.IncomingFileGroupBuilder createBuilder() { return IncomingFileGroup.builder() .name(NAME) diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..aacedd3854bd7071ae24829070cf1ffeca71b4de --- /dev/null +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java @@ -0,0 +1,1220 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.vorgang.vorgang.redirect; + +import static org.assertj.core.api.Assertions.*; +import static org.awaitility.Awaitility.*; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.eq; + +import java.io.InputStream; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Spy; + +import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils; +import de.ozgcloud.common.binaryfile.StreamingFileSender; +import de.ozgcloud.common.errorhandling.TechnicalException; +import de.ozgcloud.common.test.ReflectionTestUtils; +import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; +import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor; +import de.ozgcloud.vorgang.files.FileService; +import de.ozgcloud.vorgang.vorgang.IncomingFile; +import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; +import de.ozgcloud.vorgang.vorgang.IncomingFileGroupTestFactory; +import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; +import de.ozgcloud.vorgang.vorgang.IncomingFileTestFactory; +import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.DelegatingOnReadyHandler; +import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.ForwardingResponseObserver; +import io.grpc.stub.ClientCallStreamObserver; +import lombok.SneakyThrows; + +class EingangForwarderTest { + + @Mock + private RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; + @Mock + private FileService fileService; + @Mock + private IncomingFileMapper incomingFileMapper; + @InjectMocks + @Spy + private EingangForwarder forwarder; + + @Nested + class TestForward { + + @Mock + private GrpcRouteForwarding grpcRouteForwarding; + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; + private final List<IncomingFileGroup> attachments = List.of(IncomingFileGroupTestFactory.create()); + private final List<IncomingFile> representations = List.of(IncomingFileTestFactory.create()); + @Mock + private Future<GrpcRouteForwardingResponse> future; + + @BeforeEach + void init() { + doReturn(future).when(forwarder).performGrpcCall(); + doNothing().when(forwarder).sendRouteForwarding(any()); + doNothing().when(forwarder).sendAttachments(any()); + doNothing().when(forwarder).sendRepresentations(any()); + doNothing().when(forwarder).waitForCompletion(any()); + setRequestObserverInForwarder(requestObserver); + } + + @Test + void shouldPerformGrpcCall() { + forwarder.forward(grpcRouteForwarding, attachments, representations); + + verify(forwarder).performGrpcCall(); + } + + @Test + void shouldSendRouteForwarding() { + forwarder.forward(grpcRouteForwarding, attachments, representations); + + verify(forwarder).sendRouteForwarding(grpcRouteForwarding); + } + + @Test + void shouldSendAttachments() { + forwarder.forward(grpcRouteForwarding, attachments, representations); + + verify(forwarder).sendAttachments(attachments); + } + + @Test + void shouldSendRepresentations() { + forwarder.forward(grpcRouteForwarding, attachments, representations); + + verify(forwarder).sendRepresentations(representations); + } + + @Test + void shouldCompleteRequest() { + forwarder.forward(grpcRouteForwarding, attachments, representations); + + verify(requestObserver).onCompleted(); + } + + @Test + void shouldWaitForCompletion() { + forwarder.forward(grpcRouteForwarding, attachments, representations); + + verify(forwarder).waitForCompletion(future); + } + } + + @Nested + class TestPerformGrpcCall { + + @BeforeEach + void init() { + when(serviceStub.withInterceptors(any())).thenReturn(serviceStub); + } + + @Test + void shouldAttachClientCallContextToServiceStub() { + forwarder.performGrpcCall(); + + verify(serviceStub).withInterceptors(any(VorgangManagerClientCallContextAttachingInterceptor.class)); + } + + @Test + void shouldCreateResponseObserver() { + forwarder.performGrpcCall(); + + assertThat(getResponseObserverFromForwarder()).isNotNull(); + } + + @Test + void shouldMakeGrpcCallToRouteForwarding() { + forwarder.performGrpcCall(); + + verify(serviceStub).routeForwarding(getResponseObserverFromForwarder()); + } + + @Test + void shouldReturnFutureOfResponseObserver() { + var result = forwarder.performGrpcCall(); + + var expectedFuture = ReflectionTestUtils.getField(getResponseObserverFromForwarder(), "future", CompletableFuture.class); + assertThat(result).isSameAs(expectedFuture); + } + } + + @Nested + class TestSendRouteForwarding { + + private final GrpcRouteForwarding grpcRouteForwarding = GrpcRouteForwarding.newBuilder().build(); + @Mock + private ForwardingResponseObserver responseObserver; + @Mock + private Runnable onReadyHandler; + @Captor + private ArgumentCaptor<Runnable> onReadyHandlerCaptor; + @Captor + private ArgumentCaptor<CompletableFuture<Void>> futureCaptor; + + @BeforeEach + void init() { + setResponseObserverInForwarder(responseObserver); + doReturn(onReadyHandler).when(forwarder).getSendRouteForwardingRunnable(any(), any()); + doNothing().when(forwarder).waitForCompletion(any()); + } + + @SuppressWarnings("unchecked") + @Test + void shouldGetSendRouteForwardingRunnable() { + forwarder.sendRouteForwarding(grpcRouteForwarding); + + verify(forwarder).getSendRouteForwardingRunnable(eq(grpcRouteForwarding), any(CompletableFuture.class)); + } + + @Test + void shouldRegisterOnReadyHandler() { + forwarder.sendRouteForwarding(grpcRouteForwarding); + + verify(responseObserver).registerOnReadyHandler(onReadyHandlerCaptor.capture()); + assertIsSendRouteForwardingRunnable(onReadyHandlerCaptor.getValue()); + } + + @SuppressWarnings("unchecked") + @Test + void shouldWaitForCompletion() { + forwarder.sendRouteForwarding(grpcRouteForwarding); + + verify(forwarder).waitForCompletion(any(CompletableFuture.class)); + } + + @Test + void shouldBeTheSameFuture() { + forwarder.sendRouteForwarding(grpcRouteForwarding); + + verify(forwarder).getSendRouteForwardingRunnable(any(), futureCaptor.capture()); + verify(forwarder).waitForCompletion(futureCaptor.getValue()); + assertThat(futureCaptor.getAllValues().getFirst()).isSameAs(futureCaptor.getValue()); + } + + private void assertIsSendRouteForwardingRunnable(Runnable runnable) { + runnable.run(); + verify(onReadyHandler).run(); + } + } + + @Nested + class TestGetSendRouteForwardingRunnable { + + private final GrpcRouteForwarding grpcRouteForwarding = GrpcRouteForwardingTestFactory.create(); + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; + @Mock + private CompletableFuture<Void> future; + + @BeforeEach + void init() { + setRequestObserverInForwarder(requestObserver); + } + + @Test + void shouldCallOnNext() { + forwarder.getSendRouteForwardingRunnable(grpcRouteForwarding, future).run(); + + verify(requestObserver).onNext(GrpcRouteForwardingRequestTestFactory.create()); + } + + @Test + void shouldCompleteFuture() { + forwarder.getSendRouteForwardingRunnable(grpcRouteForwarding, future).run(); + + verify(future).complete(null); + } + + @Test + void shouldRunOnlyOnce() { + var runnable = forwarder.getSendRouteForwardingRunnable(grpcRouteForwarding, future); + + IntStream.range(0, 3).forEach(i -> runnable.run()); + + verify(requestObserver, times(1)).onNext(any()); + } + } + + @Nested + class TestSendAttachments { + + private final List<IncomingFileGroup> attachments = List.of(IncomingFileGroupTestFactory.createWithTwoFiles()); + + @BeforeEach + void init() { + doNothing().when(forwarder).sendAttachmentFile(any()); + } + + @Test + void shouldSendFirstAttachmentFile() { + forwarder.sendAttachments(attachments); + + verify(forwarder).sendAttachmentFile( + new EingangForwarder.FileInGroup(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE)); + } + + @Test + void shouldSendSecondAttachmentFile() { + forwarder.sendAttachments(attachments); + + verify(forwarder).sendAttachmentFile( + new EingangForwarder.FileInGroup(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE2)); + } + } + + @Nested + class TestSendAttachmentFile { + + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private InputStream fileContentStream; + + @BeforeEach + void init() { + when(fileService.getUploadedFileStream(any())).thenReturn(fileContentStream); + doReturn(fileSender).when(forwarder).createAttachmentFileSender(any(), any(), any()); + doReturn(fileSender).when(fileSender).send(); + doNothing().when(forwarder).waitForCompletion(any(), any()); + } + + @Test + void shouldGetUploadFileStream() { + sendAttachmentFile(); + + verify(fileService).getUploadedFileStream(IncomingFileGroupTestFactory.FILE.getId()); + } + + @Test + void shouldCreateAttachmentFileSender() { + sendAttachmentFile(); + + verify(forwarder).createAttachmentFileSender(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE, fileContentStream); + } + + @Test + void shouldSend() { + sendAttachmentFile(); + + verify(fileSender).send(); + } + + @Test + void shouldWaitForCompletion() { + sendAttachmentFile(); + + verify(forwarder).waitForCompletion(fileSender, fileContentStream); + } + + private void sendAttachmentFile() { + forwarder.sendAttachmentFile(new EingangForwarder.FileInGroup(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE)); + } + } + + @Nested + class TestCreateAttachmentFileSender { + + @Mock + private InputStream inputStream; + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; + @Captor + private ArgumentCaptor<BiFunction<byte[], Integer, GrpcRouteForwardingRequest>> chunkBuilderCaptor; + + private final byte[] chunk = RandomUtils.insecure().randomBytes(5); + private final GrpcRouteForwardingRequest metadataRequest = GrpcRouteForwardingRequestTestFactory.create(); + + @BeforeEach + void init() { + doReturn(fileSender).when(forwarder).createSenderWithoutMetadata(any(), any()); + doReturn(metadataRequest).when(forwarder).buildGrpcAttachmentFile(any(), any()); + when(fileSender.withMetaData(any())).thenReturn(fileSenderWithMetadata); + } + + @Test + void shouldCreateSenderWithoutMetadata() { + createAttachmentFileSender(); + + verify(forwarder).createSenderWithoutMetadata(chunkBuilderCaptor.capture(), eq(inputStream)); + chunkBuilderCaptor.getValue().apply(chunk, chunk.length); + verify(forwarder).buildAttachmentChunk(chunk, chunk.length); + } + + @Test + void shouldBuildGrpcAttachmentFile() { + createAttachmentFileSender(); + + verify(forwarder).buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE); + } + + @Test + void shouldSetMetaData() { + createAttachmentFileSender(); + + verify(fileSender).withMetaData(metadataRequest); + } + + @Test + void shouldReturnBuiltFileSender() { + var returnedFileSender = createAttachmentFileSender(); + + assertThat(returnedFileSender).isSameAs(fileSenderWithMetadata); + } + + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender() { + return forwarder.createAttachmentFileSender(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE, inputStream); + } + } + + @Nested + class TestBuildAttachmentChunk { + + private final byte[] chunk = RandomUtils.insecure().randomBytes(5); + + @BeforeEach + void mock() { + doReturn(GrpcAttachmentTestFactory.CONTENT).when(forwarder).buildGrpcFileContent(any(), anyInt()); + } + + @Test + void shouldCallBuildGrpcFileContent() { + forwarder.buildAttachmentChunk(chunk, chunk.length); + + verify(forwarder).buildGrpcFileContent(chunk, chunk.length); + } + + @Test + void shouldReturnGrpcRouteForwardingRequest() { + var result = forwarder.buildAttachmentChunk(chunk, chunk.length); + + assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithAttachmentContent()); + } + } + + @Nested + class TestBuildGrpcAttachmentFile { + + private final IncomingFile file = IncomingFileTestFactory.create(); + + @BeforeEach + void mock() { + when(incomingFileMapper.toAttachmentFile(any(), any())).thenReturn(GrpcAttachmentFileTestFactory.create()); + } + + @Test + void shouldCallIncomingFileMapper() { + forwarder.buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, file); + + verify(incomingFileMapper).toAttachmentFile(IncomingFileGroupTestFactory.NAME, file); + } + + @Test + void shouldReturnAttachmentMetadataRequest() { + var result = forwarder.buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, file); + + assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithAttachmentMetadata()); + } + } + + @Nested + class TestSendRepresentations { + + private static final IncomingFile FILE = IncomingFileTestFactory.create(); + + @BeforeEach + void init() { + doNothing().when(forwarder).sendRepresentationFile(any()); + } + + @Test + void shouldSendRepresentationFile() { + forwarder.sendRepresentations(List.of(FILE)); + + verify(forwarder).sendRepresentationFile(FILE); + } + } + + @Nested + class TestSendRepresentationFile { + + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private InputStream fileContentStream; + + private final IncomingFile file = IncomingFileTestFactory.create(); + + @BeforeEach + void init() { + when(fileService.getUploadedFileStream(any())).thenReturn(fileContentStream); + doReturn(fileSender).when(forwarder).createRepresentationFileSender(any(), any()); + doReturn(fileSender).when(fileSender).send(); + doNothing().when(forwarder).waitForCompletion(any(), any()); + } + + @Test + void shouldGetUploadFileStream() { + sendRepresentationFile(); + + verify(fileService).getUploadedFileStream(IncomingFileTestFactory.ID); + } + + @Test + void shouldCreateRepresentationFileSender() { + sendRepresentationFile(); + + verify(forwarder).createRepresentationFileSender(file, fileContentStream); + } + + @Test + void shouldSend() { + sendRepresentationFile(); + + verify(fileSender).send(); + } + + @Test + void shouldWaitForCompletion() { + sendRepresentationFile(); + + verify(forwarder).waitForCompletion(fileSender, fileContentStream); + } + + private void sendRepresentationFile() { + forwarder.sendRepresentationFile(file); + } + } + + @Nested + class TestCreateRepresentationFileSender { + + @Mock + private InputStream inputStream; + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; + @Captor + private ArgumentCaptor<BiFunction<byte[], Integer, GrpcRouteForwardingRequest>> chunkBuilderCaptor; + + private final byte[] chunk = RandomUtils.insecure().randomBytes(5); + private final GrpcRouteForwardingRequest metadataRequest = GrpcRouteForwardingRequestTestFactory.create(); + private final IncomingFile incomingFile = IncomingFileTestFactory.create(); + + @BeforeEach + void init() { + doReturn(fileSender).when(forwarder).createSenderWithoutMetadata(any(), any()); + doReturn(metadataRequest).when(forwarder).buildGrpcRepresentationFile(any()); + when(fileSender.withMetaData(any())).thenReturn(fileSenderWithMetadata); + } + + @Test + void shouldCallCreateSenderWithoutMetadata() { + createRepresentationFileSender(); + + verify(forwarder).createSenderWithoutMetadata(chunkBuilderCaptor.capture(), eq(inputStream)); + chunkBuilderCaptor.getValue().apply(chunk, chunk.length); + verify(forwarder).buildRepresentationChunk(chunk, chunk.length); + } + + @Test + void shouldCallBuildGrpcRepresentationFile() { + createRepresentationFileSender(); + + verify(forwarder).buildGrpcRepresentationFile(incomingFile); + } + + @Test + void shouldSetMetaData() { + createRepresentationFileSender(); + + verify(fileSender).withMetaData(metadataRequest); + } + + @Test + void shouldReturnBuiltFileSender() { + var returnedFileSender = createRepresentationFileSender(); + + assertThat(returnedFileSender).isSameAs(fileSenderWithMetadata); + } + + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender() { + return forwarder.createRepresentationFileSender(incomingFile, inputStream); + } + } + + @Nested + class TestBuildRepresentationChunk { + + private final byte[] chunk = RandomUtils.insecure().randomBytes(5); + + @BeforeEach + void mock() { + doReturn(GrpcRepresentationTestFactory.CONTENT).when(forwarder).buildGrpcFileContent(any(), anyInt()); + } + + @Test + void shouldCallBuildGrpcFileContent() { + forwarder.buildRepresentationChunk(chunk, chunk.length); + + verify(forwarder).buildGrpcFileContent(chunk, chunk.length); + } + + @Test + void shouldReturnGrpcRouteForwardingRequest() { + var result = forwarder.buildRepresentationChunk(chunk, chunk.length); + + assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithRepresentationContent()); + } + } + + @Nested + class TestBuildGrpcRepresentationFile { + + private final IncomingFile file = IncomingFileTestFactory.create(); + + @BeforeEach + void mock() { + when(incomingFileMapper.toRepresentationFile(any())).thenReturn(GrpcRepresentationFileTestFactory.create()); + } + + @Test + void shouldCallIncomingFileMapper() { + forwarder.buildGrpcRepresentationFile(file); + + verify(incomingFileMapper).toRepresentationFile(file); + } + + @Test + void shouldReturnRepresentationMetadataRequest() { + var result = forwarder.buildGrpcRepresentationFile(file); + + assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithRepresentationMetadata()); + } + } + + @Nested + class TestBuildGrpcFileContent { + + @Nested + class TestOnEndOfFile { + + @Test + void shouldBuildEndOfFileChunk() { + var fileContent = forwarder.buildGrpcFileContent(new byte[0], -1); + + assertThat(fileContent).isEqualTo(GrpcFileContentTestFactory.createEndOfFile()); + } + } + + @Nested + class TestOnContentProvided { + + @Test + void shouldBuildEndOfFileChunk() { + var fileContent = forwarder.buildGrpcFileContent(GrpcFileContentTestFactory.CONTENT, GrpcFileContentTestFactory.CONTENT.length); + + assertThat(fileContent).isEqualTo(GrpcFileContentTestFactory.create()); + } + } + } + + @Nested + class TestCreateSenderWithoutMetadata { + + private MockedStatic<GrpcFileUploadUtils> grpcFileUploadUtilsMock; + @Mock + private BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder; + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; + @Mock + private ForwardingResponseObserver responseObserver; + @Mock + private InputStream inputStream; + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private Runnable onReadyHandler; + @Captor + private ArgumentCaptor<Consumer<Runnable>> onReadyHandlerCaptor; + + @BeforeEach + void init() { + grpcFileUploadUtilsMock = mockStatic(GrpcFileUploadUtils.class); + grpcFileUploadUtilsMock.when(() -> GrpcFileUploadUtils.createStreamSharingSender(any(), any(), any(), any())).thenReturn(fileSender); + ReflectionTestUtils.setField(forwarder, "responseObserver", responseObserver); + ReflectionTestUtils.setField(forwarder, "requestObserver", requestObserver); + } + + @AfterEach + void tearDown() { + grpcFileUploadUtilsMock.close(); + } + + @Test + void shouldCreateFileSender() { + + createSenderWithoutMetadata(); + + grpcFileUploadUtilsMock + .verify(() -> GrpcFileUploadUtils.<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse>createStreamSharingSender( + eq(chunkBuilder), eq(inputStream), eq(requestObserver), onReadyHandlerCaptor.capture())); + assertIsRegisterOnReadyHandler(onReadyHandlerCaptor); + } + + @Test + void shouldReturnCreatedFileSender() { + var returnedFileSender = createSenderWithoutMetadata(); + + assertThat(returnedFileSender).isSameAs(fileSender); + } + + private void assertIsRegisterOnReadyHandler(ArgumentCaptor<Consumer<Runnable>> captor) { + captor.getValue().accept(onReadyHandler); + verify(responseObserver).registerOnReadyHandler(onReadyHandler); + } + + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata() { + return forwarder.createSenderWithoutMetadata(chunkBuilder, inputStream); + } + } + + @Nested + class TestWaitForCompletionOfFuture { + + @Mock + private CompletableFuture<Void> future; + + @SneakyThrows + @Test + void shouldGetFromFuture() { + waitForCompletion(); + + verify(future).get(EingangForwarder.TIMEOUT_MINUTES, TimeUnit.MINUTES); + } + + @Nested + class TestOnInterruptedException { + + private final InterruptedException exception = new InterruptedException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFuture.this::waitForCompletion); + } + + @Test + void shouldInterruptThread() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + } + + @Nested + class TestOnExecutionException { + + private final ExecutionException exception = new ExecutionException(new Exception()); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFuture.this::waitForCompletion); + } + } + + @Nested + class TestOnTimeoutException { + + private final TimeoutException exception = new TimeoutException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFuture.this::waitForCompletion); + } + } + + private void waitForCompletion() { + forwarder.waitForCompletion(future); + } + } + + @Nested + class TestWaitForCompletionOfFileSender { + + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private InputStream fileContentStream; + @Mock + private CompletableFuture<GrpcRouteForwardingResponse> future; + + @BeforeEach + void init() { + when(fileSender.getResultFuture()).thenReturn(future); + } + + @SneakyThrows + @Test + void shouldGetFromFuture() { + waitForCompletion(); + + verify(future).get(EingangForwarder.TIMEOUT_MINUTES, TimeUnit.MINUTES); + } + + @Nested + class TestOnInterruptedException { + + private final InterruptedException exception = new InterruptedException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldInterruptThread() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + + @Test + void shouldCancelOnError() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileSender).cancelOnError(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFileSender.this::waitForCompletion); + } + + @SneakyThrows + @Test + void shouldCloseStream() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileContentStream).close(); + } + } + + @Nested + class TestOnExecutionException { + + private final ExecutionException exception = new ExecutionException(new Exception()); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldCancelOnError() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileSender).cancelOnError(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFileSender.this::waitForCompletion); + } + + @SneakyThrows + @Test + void shouldCloseStream() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileContentStream).close(); + } + } + + @Nested + class TestOnTimeoutException { + + private final TimeoutException exception = new TimeoutException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldCancelOnTimeout() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileSender).cancelOnTimeout(); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFileSender.this::waitForCompletion); + } + + @SneakyThrows + @Test + void shouldCloseStream() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileContentStream).close(); + } + } + + private void waitForCompletion() { + forwarder.waitForCompletion(fileSender, fileContentStream); + } + } + + @Nested + class TestForwardingResponseObserver { + + @Mock + private CompletableFuture<GrpcRouteForwardingResponse> future; + @Mock + private DelegatingOnReadyHandler onReadyHandler; + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream; + private final GrpcRouteForwardingResponse response = GrpcRouteForwardingResponse.getDefaultInstance(); + @InjectMocks + private ForwardingResponseObserver observer; + + @Nested + class TestBeforeStart { + + @Test + void shouldCreateOnReadyHandler() { + observer.beforeStart(requestStream); + + assertThat(getOnReadyHandlerFromObserver()).isNotNull(); + } + + @Test + void shouldSetOnReadyHandler() { + observer.beforeStart(requestStream); + + verify(requestStream).setOnReadyHandler(getOnReadyHandlerFromObserver()); + } + } + + @Nested + class TestOnNext { + + @Test + void shouldSetResponse() { + observer.onNext(response); + + assertThat(getResponseFromObserver()).isSameAs(response); + } + } + + @Nested + class TestOnError { + + private final Throwable error = new RuntimeException("Error when forwarding"); + + @BeforeEach + void init() { + setOnReadyHandlerInObserver(); + } + + @Test + void shouldStopOnReadyHandler() { + observer.onError(error); + + verify(onReadyHandler).stop(); + } + + @Test + void shouldCompleteFutureExceptionally() { + observer.onError(error); + + verify(future).completeExceptionally(error); + } + } + + @Nested + class TestOnCompleted { + + @BeforeEach + void init() { + setOnReadyHandlerInObserver(); + } + + @Test + void shouldStopOnReadyHandler() { + observer.onCompleted(); + + verify(onReadyHandler).stop(); + } + + @Test + void shouldCompleteFutureWithResponse() { + observer.onNext(response); + + observer.onCompleted(); + + verify(future).complete(response); + } + } + + @Nested + class TestRegisterOnReadyHandler { + + @Mock + private Runnable delegate; + + @BeforeEach + void init() { + setOnReadyHandlerInObserver(); + } + + @Test + void shouldSetDelegateInOnReadyHandler() { + observer.registerOnReadyHandler(delegate); + + verify(onReadyHandler).setDelegate(delegate); + } + } + + private DelegatingOnReadyHandler getOnReadyHandlerFromObserver() { + return ReflectionTestUtils.getField(observer, "onReadyHandler", DelegatingOnReadyHandler.class); + } + + private void setOnReadyHandlerInObserver() { + ReflectionTestUtils.setField(observer, "onReadyHandler", onReadyHandler); + } + + private GrpcRouteForwardingResponse getResponseFromObserver() { + return ReflectionTestUtils.getField(observer, "response", GrpcRouteForwardingResponse.class); + } + } + + @Nested + class TestDelegatingOnReadyHandler { + + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream; + @InjectMocks + private DelegatingOnReadyHandler onReadyHandler; + + @Test + void shouldDoneBeInitiallyFalse() { + assertThat(getDoneFromOnReadyHandler()).isFalse(); + } + + @Nested + class TestSetDelegate { + + @Mock + private Runnable delegate; + + @Test + void shouldSetDelegate() { + onReadyHandler.setDelegate(delegate); + + assertThat(getDelegateFromOnReadyHandler()).isSameAs(delegate); + } + } + + @Nested + class TestStop { + + @Test + void shouldSetDoneToTrue() { + onReadyHandler.stop(); + + assertThat(getDoneFromOnReadyHandler()).isTrue(); + } + } + + @Nested + class TestRun { + + @Mock + private Runnable delegate; + + @BeforeEach + void init() { + onReadyHandler.setDelegate(delegate); + } + + @Test + void shouldNotRunDelegateIfDone() { + onReadyHandler.stop(); + lenient().when(requestStream.isReady()).thenReturn(true); + + onReadyHandler.run(); + + verify(delegate, never()).run(); + } + + @Test + void shouldNotRunDelegateIfNotReady() { + when(requestStream.isReady()).thenReturn(false); + + onReadyHandler.run(); + + verify(delegate, never()).run(); + } + + @Test + void shouldRunDelegateIfNotDoneAndReady() { + when(requestStream.isReady()).thenReturn(true).thenReturn(false); + runWithOnReadyHandlerInAnotherThread(() -> { + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> verify(delegate, atLeastOnce()).run()); + }); + } + + @Test + void shouldContinueAfterDelegateWasReplaced() { + when(requestStream.isReady()).thenReturn(true); + runWithOnReadyHandlerInAnotherThread(() -> { + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> verify(delegate, atLeastOnce()).run()); + var delegate2 = mock(Runnable.class); + onReadyHandler.setDelegate(delegate2); + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> verify(delegate2, atLeastOnce()).run()); + }); + } + + private void runWithOnReadyHandlerInAnotherThread(Runnable runnable) { + try (ExecutorService executor = Executors.newSingleThreadExecutor()) { + var future = executor.submit(onReadyHandler); + runnable.run(); + future.cancel(true); + executor.shutdown(); + } + } + } + + private boolean getDoneFromOnReadyHandler() { + return ReflectionTestUtils.getField(onReadyHandler, "done", AtomicBoolean.class).get(); + } + + private Runnable getDelegateFromOnReadyHandler() { + return (Runnable) ReflectionTestUtils.getField(onReadyHandler, "delegateRef", AtomicReference.class).get(); + } + } + + private void setResponseObserverInForwarder(ForwardingResponseObserver responseObserver) { + ReflectionTestUtils.setField(forwarder, "responseObserver", responseObserver); + } + + private ForwardingResponseObserver getResponseObserverFromForwarder() { + return ReflectionTestUtils.getField(forwarder, "responseObserver", ForwardingResponseObserver.class); + } + + private void setRequestObserverInForwarder(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver) { + ReflectionTestUtils.setField(forwarder, "requestObserver", requestObserver); + } +} diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java index 5e6e63b58402a779cb7716609f773772e50b2342..2a1c5ea86879039b47fcd9c769a6e8b82d4dcf63 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java @@ -23,774 +23,67 @@ */ package de.ozgcloud.vorgang.vorgang.redirect; -import static org.assertj.core.api.Assertions.*; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.io.InputStream; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; -import java.util.function.Function; -import org.apache.commons.lang3.RandomUtils; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.MockedStatic; import org.mockito.Spy; -import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils; -import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; -import de.ozgcloud.common.errorhandling.TechnicalException; -import de.ozgcloud.common.test.ReflectionTestUtils; -import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; -import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; -import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; -import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor; -import de.ozgcloud.vorgang.files.FileService; -import de.ozgcloud.vorgang.vorgang.Eingang; import de.ozgcloud.vorgang.vorgang.EingangTestFactory; -import de.ozgcloud.vorgang.vorgang.IncomingFile; -import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; -import de.ozgcloud.vorgang.vorgang.IncomingFileGroupTestFactory; -import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; -import de.ozgcloud.vorgang.vorgang.IncomingFileTestFactory; -import de.ozgcloud.vorgang.vorgang.Vorgang; import de.ozgcloud.vorgang.vorgang.VorgangService; import de.ozgcloud.vorgang.vorgang.VorgangTestFactory; -import de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteService.ForwardingResponseObserver; -import io.grpc.stub.CallStreamObserver; -import io.grpc.stub.StreamObserver; -import lombok.SneakyThrows; class ForwardingRemoteServiceTest { - @Spy - @InjectMocks - private ForwardingRemoteService service; @Mock private VorgangService vorgangService; @Mock private ForwardingRequestMapper forwardingRequestMapper; - @Mock - private RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; - @Mock - private FileService fileService; - @Mock - private IncomingFileMapper incomingFileMapper; - - @Mock - private StreamObserver<GrpcRouteForwardingRequest> requestObserver; - private final ForwardingRequest request = ForwardingRequestTestFactory.create(); - private final Eingang eingang = EingangTestFactory.create(); - private final Vorgang vorgang = VorgangTestFactory.createBuilder().clearEingangs().eingang(eingang).build(); + @InjectMocks + @Spy + private ForwardingRemoteService service; @Nested class TestForward { - @Captor - private ArgumentCaptor<ForwardingResponseObserver> responseObserverCaptor; - @Captor - private ArgumentCaptor<CompletableFuture<Void>> futureCaptor; - - @BeforeEach - void init() { - doNothing().when(service).routeForwarding(any(), any()); - doNothing().when(service).waitForCompletion(any()); - } - - @Test - void shouldRouteForwarding() { - forward(); - - verify(service).routeForwarding(eq(request), any(ForwardingResponseObserver.class)); - } - - @Test - void shouldWaitForCompletion() { - forward(); - - verify(service).waitForCompletion(futureCaptor.capture()); - verify(service).routeForwarding(any(), responseObserverCaptor.capture()); - assertThat(futureCaptor.getValue()) - .isSameAs(ReflectionTestUtils.getField(responseObserverCaptor.getValue(), "future", CompletableFuture.class)); - } - - private void forward() { - service.forward(request); - } - } - - @Nested - class TestRouteForwarding { - + private final ForwardingRequest request = ForwardingRequestTestFactory.create(); + private final GrpcRouteForwarding grpcRouteForwarding = GrpcRouteForwardingTestFactory.create(); @Mock - private ForwardingResponseObserver responseObserver; - - @BeforeEach - void init() { - when(serviceStub.withInterceptors(any())).thenReturn(serviceStub); - } - - @Test - void shouldAttachClientCallContextToServiceStub() { - givenGrpcCallCompletedSuccessfully(); - doNothing().when(service).sendEingang(any(), any()); - - routeForwarding(); - - verify(serviceStub).withInterceptors(any(VorgangManagerClientCallContextAttachingInterceptor.class)); - } - - @Test - void shouldMakeGrpcCallToRouteForwarding() { - givenGrpcCallCompletedSuccessfully(); - doNothing().when(service).sendEingang(any(), any()); - - routeForwarding(); - - verify(serviceStub).routeForwarding(responseObserver); - } - - @Nested - class OnSuccess { - - @BeforeEach - void init() { - givenGrpcCallCompletedSuccessfully(); - doNothing().when(service).sendEingang(any(), any()); - } - - @Test - void shouldSendEingang() { - routeForwarding(); - - verify(service).sendEingang(request, requestObserver); - } - - @Test - void shouldCallOnCompleted() { - routeForwarding(); - - verify(requestObserver).onCompleted(); - } - } - - @Nested - class OnFailure { - - private final RuntimeException error = new RuntimeException(); - - @BeforeEach - void init() { - givenGrpcCallCompletedSuccessfully(); - doThrow(error).when(service).sendEingang(any(), any()); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - void shouldCallOnError() { - catchThrowableOfType(RuntimeException.class, TestRouteForwarding.this::routeForwarding); - - verify(requestObserver).onError(error); - } - - @Test - void shouldThrowError() { - assertThatThrownBy(TestRouteForwarding.this::routeForwarding).isSameAs(error); - } - } - - private void givenGrpcCallCompletedSuccessfully() { - when(serviceStub.routeForwarding(any())).thenAnswer(invocation -> { - ((ForwardingResponseObserver) invocation.getArgument(0)).onCompleted(); - return requestObserver; - }); - } - - private void routeForwarding() { - service.routeForwarding(request, responseObserver); - } - } - - @Nested - class TestSendEingang { - - private final GrpcRouteForwardingRequest routeForwardingRequest = GrpcRouteForwardingRequestTestFactory.create(); + private EingangForwarder eingangForwarder; @BeforeEach void init() { - when(vorgangService.getById(any())).thenReturn(vorgang); - doReturn(routeForwardingRequest).when(service).buildRouteForwardingRequest(any(), any()); - doNothing().when(service).sendAttachments(any(), any()); - doNothing().when(service).sendRepresentations(any(), any()); + doReturn(eingangForwarder).when(service).getEingangForwarder(); + when(vorgangService.getById(any())).thenReturn(VorgangTestFactory.create()); + when(forwardingRequestMapper.toGrpcRouteForwarding(any(), any())).thenReturn(grpcRouteForwarding); } @Test - void shouldGetVorgangById() { - sendEingang(); + void shouldGetVorgang() { + service.forward(request); verify(vorgangService).getById(VorgangTestFactory.ID); } @Test - void shouldBuildRouteForwardingRequest() { - sendEingang(); - - verify(service).buildRouteForwardingRequest(request, eingang); - } - - @Test - void shouldSendForwardingRequest() { - sendEingang(); - - verify(requestObserver).onNext(routeForwardingRequest); - } - - @Test - void shouldCallSendAttachments() { - sendEingang(); - - verify(service).sendAttachments(List.of(EingangTestFactory.ATTACHMENT), requestObserver); - } - - @Test - void shouldCallSendRepresentations() { - sendEingang(); - - verify(service).sendRepresentations(List.of(EingangTestFactory.REPRESENTATION), requestObserver); - } - - private void sendEingang() { - service.sendEingang(request, requestObserver); - } - } - - @Nested - class TestBuildRouteForwardingRequest { - - private final GrpcRouteForwarding routeForwarding = GrpcRouteForwardingTestFactory.create(); - - @BeforeEach - void init() { - when(forwardingRequestMapper.toGrpcRouteForwarding(any(), any())).thenReturn(routeForwarding); - } - - @Test - void shouldMapToRouteForwarding() { - buildRouteForwardingRequest(); - - verify(forwardingRequestMapper).toGrpcRouteForwarding(request, eingang); - } - - @Test - void shouldReturnRouteForwardingRequest() { - var builtRequest = buildRouteForwardingRequest(); - - assertThat(builtRequest).isEqualTo(GrpcRouteForwardingRequestTestFactory.create()); - } - - private GrpcRouteForwardingRequest buildRouteForwardingRequest() { - return service.buildRouteForwardingRequest(request, eingang); - } - } - - @Nested - class TestSendAttachments { - - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - @Mock - private InputStream inputStream; - - private final IncomingFileGroup attachment = IncomingFileGroupTestFactory.create(); - - @BeforeEach - void init() { - when(fileService.getUploadedFileStream(any())).thenReturn(inputStream); - doReturn(fileSender).when(service).createAttachmentFileSender(any(), any(), any(), any()); - when(fileSender.send()).thenReturn(fileSender); - } - - @Test - void shouldGetUploadedFileContent() { - sendAttachments(); - - verify(fileService).getUploadedFileStream(IncomingFileTestFactory.ID); - } - - @Test - void shouldCallCreateAttachmentFileSender() { - sendAttachments(); - - verify(service).createAttachmentFileSender(requestObserver, IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE, - inputStream); - } - - @Test - void shouldSend() { - sendAttachments(); - - verify(fileSender).send(); - } - - private void sendAttachments() { - service.sendAttachments(List.of(attachment), requestObserver); - } - } - - @Nested - class TestCreateAttachmentFileSender { - - @Mock - private InputStream inputStream; - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; - @Captor - private ArgumentCaptor<BiFunction<byte[], Integer, GrpcRouteForwardingRequest>> chunkBuilderCaptor; - - private final byte[] chunk = RandomUtils.insecure().randomBytes(5); - private final GrpcRouteForwardingRequest metadataRequest = GrpcRouteForwardingRequestTestFactory.create(); - - @BeforeEach - void init() { - doReturn(fileSender).when(service).createSenderWithoutMetadata(any(), any(), any()); - doReturn(metadataRequest).when(service).buildGrpcAttachmentFile(any(), any()); - when(fileSender.withMetaData(any())).thenReturn(fileSenderWithMetadata); - } - - @Test - void shouldCallCreateSenderWithoutMetadata() { - createAttachmentFileSender(); - - verify(service).createSenderWithoutMetadata(chunkBuilderCaptor.capture(), eq(requestObserver), eq(inputStream)); - chunkBuilderCaptor.getValue().apply(chunk, chunk.length); - verify(service).buildAttachmentChunk(chunk, chunk.length); - } - - @Test - void shouldCallBuildGrpcAttachmentFile() { - createAttachmentFileSender(); - - verify(service).buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE); - } - - @Test - void shouldSetMetaData() { - createAttachmentFileSender(); - - verify(fileSender).withMetaData(metadataRequest); - } - - @Test - void shouldReturnBuiltFileSender() { - var returnedFileSender = createAttachmentFileSender(); - - assertThat(returnedFileSender).isSameAs(fileSenderWithMetadata); - } - - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender() { - return service.createAttachmentFileSender(requestObserver, IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE, - inputStream); - } - } - - @Nested - class TestBuildAttachmentChunk { - - private final byte[] chunk = RandomUtils.insecure().randomBytes(5); - - @BeforeEach - void mock() { - doReturn(GrpcAttachmentTestFactory.CONTENT).when(service).buildGrpcFileContent(any(), anyInt()); - } - - @Test - void shouldCallBuildGrpcFileContent() { - service.buildAttachmentChunk(chunk, chunk.length); - - verify(service).buildGrpcFileContent(chunk, chunk.length); - } - - @Test - void shouldReturnGrpcRouteForwardingRequest() { - var result = service.buildAttachmentChunk(chunk, chunk.length); - - assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithAttachmentContent()); - } - } - - @Nested - class TestBuildGrpcAttachmentFile { - - private final IncomingFile file = IncomingFileTestFactory.create(); - - @BeforeEach - void mock() { - when(incomingFileMapper.toAttachmentFile(any(), any())).thenReturn(GrpcAttachmentFileTestFactory.create()); - } - - @Test - void shouldCallIncomingFileMapper() { - service.buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, file); - - verify(incomingFileMapper).toAttachmentFile(IncomingFileGroupTestFactory.NAME, file); - } - - @Test - void shouldReturnAttachmentMetadataRequest() { - var result = service.buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, file); - - assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithAttachmentMetadata()); - } - } - - @Nested - class TestSendRepresentations { - - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - @Mock - private InputStream inputStream; - - private final IncomingFile representation = IncomingFileTestFactory.create(); - - @BeforeEach - void init() { - when(fileService.getUploadedFileStream(any())).thenReturn(inputStream); - doReturn(fileSender).when(service).createRepresentationFileSender(any(), any(), any()); - when(fileSender.send()).thenReturn(fileSender); - } - - @Test - void shouldGetUploadedFileContent() { - sendRepresentations(); - - verify(fileService).getUploadedFileStream(IncomingFileTestFactory.ID); - } - - @Test - void shouldCallCreateRepresentationFileSender() { - sendRepresentations(); - - verify(service).createRepresentationFileSender(requestObserver, representation, inputStream); - } - - @Test - void shouldSend() { - sendRepresentations(); - - verify(fileSender).send(); - } - - private void sendRepresentations() { - service.sendRepresentations(List.of(representation), requestObserver); - } - } - - @Nested - class TestCreateRepresentationFileSender { - - @Mock - private InputStream inputStream; - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; - @Captor - private ArgumentCaptor<BiFunction<byte[], Integer, GrpcRouteForwardingRequest>> chunkBuilderCaptor; - - private final byte[] chunk = RandomUtils.insecure().randomBytes(5); - private final GrpcRouteForwardingRequest metadataRequest = GrpcRouteForwardingRequestTestFactory.create(); - private final IncomingFile incomingFile = IncomingFileTestFactory.create(); - - @BeforeEach - void init() { - doReturn(fileSender).when(service).createSenderWithoutMetadata(any(), any(), any()); - doReturn(metadataRequest).when(service).buildGrpcRepresentationFile(any()); - when(fileSender.withMetaData(any())).thenReturn(fileSenderWithMetadata); - } - - @Test - void shouldCallCreateSenderWithoutMetadata() { - createRepresentationFileSender(); - - verify(service).createSenderWithoutMetadata(chunkBuilderCaptor.capture(), eq(requestObserver), eq(inputStream)); - chunkBuilderCaptor.getValue().apply(chunk, chunk.length); - verify(service).buildRepresentationChunk(chunk, chunk.length); - } - - @Test - void shouldCallBuildGrpcRepresentationFile() { - createRepresentationFileSender(); - - verify(service).buildGrpcRepresentationFile(incomingFile); - } - - @Test - void shouldSetMetaData() { - createRepresentationFileSender(); - - verify(fileSender).withMetaData(metadataRequest); - } - - @Test - void shouldReturnBuiltFileSender() { - var returnedFileSender = createRepresentationFileSender(); - - assertThat(returnedFileSender).isSameAs(fileSenderWithMetadata); - } - - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender() { - return service.createRepresentationFileSender(requestObserver, incomingFile, inputStream); - } - } - - @Nested - class TestCreateSenderWithoutMetadata { - - private MockedStatic<GrpcFileUploadUtils> grpcFileUploadUtilsMock; - @Mock - private BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder; - @Mock - private CallStreamObserver<GrpcRouteForwardingRequest> requestCallStreamObserver; - @Mock - private InputStream inputStream; - @Mock - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - @Mock - private StreamObserver<GrpcRouteForwardingResponse> responseObserver; - @Captor - private ArgumentCaptor<Function<StreamObserver<GrpcRouteForwardingResponse>, CallStreamObserver<GrpcRouteForwardingRequest>>> reqObserverBuilderCaptor; - - @BeforeEach - void init() { - grpcFileUploadUtilsMock = mockStatic(GrpcFileUploadUtils.class); - grpcFileUploadUtilsMock.when(() -> GrpcFileUploadUtils.createSender(any(), any(), any(), anyBoolean())).thenReturn(fileSender); - } - - @AfterEach - void tearDown() { - grpcFileUploadUtilsMock.close(); - } - - @Test - void shouldCreateFileSender() { - createSenderWithoutMetadata(); - - grpcFileUploadUtilsMock - .verify(() -> GrpcFileUploadUtils.createSender(eq(chunkBuilder), eq(inputStream), reqObserverBuilderCaptor.capture(), eq(false))); - assertThat(reqObserverBuilderCaptor.getValue().apply(responseObserver)).isSameAs(requestCallStreamObserver); - } - - @Test - void shouldReturnCreatedFileSender() { - var returnedFileSender = createSenderWithoutMetadata(); - - assertThat(returnedFileSender).isSameAs(fileSender); - } - - private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata() { - return service.createSenderWithoutMetadata(chunkBuilder, requestCallStreamObserver, inputStream); - } - } - - @Nested - class TestBuildRepresentationChunk { - - private final byte[] chunk = RandomUtils.insecure().randomBytes(5); - - @BeforeEach - void mock() { - doReturn(GrpcRepresentationTestFactory.CONTENT).when(service).buildGrpcFileContent(any(), anyInt()); - } - - @Test - void shouldCallBuildGrpcFileContent() { - service.buildRepresentationChunk(chunk, chunk.length); - - verify(service).buildGrpcFileContent(chunk, chunk.length); - } - - @Test - void shouldReturnGrpcRouteForwardingRequest() { - var result = service.buildRepresentationChunk(chunk, chunk.length); - - assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithRepresentationContent()); - } - } - - @Nested - class TestBuildGrpcFileContent { - - @Nested - class TestOnEndOfFile { - - @Test - void shouldBuildEndOfFileChunk() { - var fileContent = service.buildGrpcFileContent(new byte[0], -1); - - assertThat(fileContent).isEqualTo(GrpcFileContentTestFactory.createEndOfFile()); - } - } - - @Nested - class TestOnContentProvided { - - @Test - void shouldBuildEndOfFileChunk() { - var fileContent = service.buildGrpcFileContent(GrpcFileContentTestFactory.CONTENT, GrpcFileContentTestFactory.CONTENT.length); - - assertThat(fileContent).isEqualTo(GrpcFileContentTestFactory.create()); - } - } - } - - @Nested - class TestBuildGrpcRepresentationFile { - - private final IncomingFile file = IncomingFileTestFactory.create(); - - @BeforeEach - void mock() { - when(incomingFileMapper.toRepresentationFile(any())).thenReturn(GrpcRepresentationFileTestFactory.create()); - } - - @Test - void shouldCallIncomingFileMapper() { - service.buildGrpcRepresentationFile(file); - - verify(incomingFileMapper).toRepresentationFile(file); - } - - @Test - void shouldReturnRepresentationMetadataRequest() { - var result = service.buildGrpcRepresentationFile(file); - - assertThat(result).isEqualTo(GrpcRouteForwardingRequestTestFactory.createWithRepresentationMetadata()); - } - } - - @Nested - class TestWaitForCompletion { - - @Mock - private CompletableFuture<Void> future; - - @SneakyThrows - @Test - void shouldGetFromFuture() { - waitForCompletion(); - - verify(future).get(2, TimeUnit.MINUTES); - } - - @Nested - class TestOnInterruptedException { - - private final InterruptedException exception = new InterruptedException(); - - @BeforeEach - @SneakyThrows - void mock() { - when(future.get(anyLong(), any())).thenThrow(exception); - } - - @Test - void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); - } - - @Test - void shouldInterruptThread() { - try { - waitForCompletion(); - } catch (TechnicalException e) { - // expected - } - - assertThat(Thread.currentThread().isInterrupted()).isTrue(); - } - } - - @Nested - class TestOnExecutionException { - - private final ExecutionException exception = new ExecutionException(new Exception()); - - @BeforeEach - @SneakyThrows - void mock() { - when(future.get(anyLong(), any())).thenThrow(exception); - } - - @Test - void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); - } - } - - @Nested - class TestOnTimeoutException { - - private final TimeoutException exception = new TimeoutException(); - - @BeforeEach - @SneakyThrows - void mock() { - when(future.get(anyLong(), any())).thenThrow(exception); - } - - @Test - void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); - } - } - - private void waitForCompletion() { - service.waitForCompletion(future); - } - } - - @Nested - class ForwardingResponseObserverTest { - - @Mock - private CompletableFuture<Void> future; - private ForwardingResponseObserver responseObserver; - - @BeforeEach - void init() { - responseObserver = new ForwardingResponseObserver(future); - } - - @Test - void shouldCompleteExceptionallyOnError() { - var error = new Throwable(); - - responseObserver.onError(error); + void shouldMapToGrpcRouteForwarding() { + service.forward(request); - verify(future).completeExceptionally(error); + verify(forwardingRequestMapper).toGrpcRouteForwarding(request, VorgangTestFactory.EINGANG); } @Test - void shouldCompleteOnCompleted() { - responseObserver.onCompleted(); + void shouldForward() { + service.forward(request); - verify(future).complete(null); + verify(eingangForwarder).forward(grpcRouteForwarding, List.of(EingangTestFactory.ATTACHMENT), List.of(EingangTestFactory.REPRESENTATION)); } } }