diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java index 6c71321b1b0c19b40ae64194cbe0bde6c552ccc4..1440223302a0b2b50a2027e8e957e17ed051401d 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java @@ -26,11 +26,15 @@ package de.ozgcloud.vorgang.vorgang.redirect; import java.io.InputStream; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Function; +import org.apache.commons.io.IOUtils; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -39,6 +43,7 @@ import com.google.protobuf.ByteString; import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils; import de.ozgcloud.common.binaryfile.StreamingFileSender; +import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; import de.ozgcloud.eingang.forwarding.GrpcAttachment; import de.ozgcloud.eingang.forwarding.GrpcFileContent; @@ -53,7 +58,6 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; -import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import net.devh.boot.grpc.client.inject.GrpcClient; @@ -64,6 +68,8 @@ import net.devh.boot.grpc.client.inject.GrpcClient; @Log4j2 class EingangForwarder { + static final int TIMEOUT_MINUTES = 10; + @GrpcClient("forwarder") private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; private final FileService fileService; @@ -72,30 +78,16 @@ class EingangForwarder { private ForwardingResponseObserver responseObserver; private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; - @Getter - private CompletableFuture<Void> forwardFuture; - - public EingangForwarder forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, - List<IncomingFile> representations) { - - forwardFuture = CompletableFuture.allOf( - callService(), - sendRouteForwarding(grpcRouteForwarding) - .thenCompose(ignored -> sendAttachments(attachments)) - .thenCompose(ignored -> sendRepresentations(representations)) - .whenComplete((result, ex) -> { - if (ex != null) { - requestObserver.onError(ex); - } else { - requestObserver.onCompleted(); - } - }) - ); - return this; + public void forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) { + var future = performGrpcCall(); + sendRouteForwarding(grpcRouteForwarding); + sendAttachments(attachments); + sendRepresentations(representations); + waitForCompletion(future); } - CompletableFuture<GrpcRouteForwardingResponse> callService() { - CompletableFuture<GrpcRouteForwardingResponse> responseFuture = new CompletableFuture<>(); + Future<GrpcRouteForwardingResponse> performGrpcCall() { + var responseFuture = new CompletableFuture<GrpcRouteForwardingResponse>(); responseObserver = new ForwardingResponseObserver(responseFuture); requestObserver = (ClientCallStreamObserver<GrpcRouteForwardingRequest>) serviceStub.withInterceptors( new VorgangManagerClientCallContextAttachingInterceptor()) @@ -103,41 +95,35 @@ class EingangForwarder { return responseFuture; } - CompletableFuture<GrpcRouteForwardingResponse> sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) { - CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>(); + void sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) { + var future = new CompletableFuture<Void>(); responseObserver.registerOnReadyHandler(getSendRouteForwardingRunnable(grpcRouteForwarding, future)); - return future; + waitForCompletion(future); } - Runnable getSendRouteForwardingRunnable(GrpcRouteForwarding grpcRouteForwarding, CompletableFuture<GrpcRouteForwardingResponse> future) { + Runnable getSendRouteForwardingRunnable(GrpcRouteForwarding grpcRouteForwarding, CompletableFuture<Void> future) { return () -> { requestObserver.onNext(GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(grpcRouteForwarding).build()); - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); + future.complete(null); }; } - CompletableFuture<GrpcRouteForwardingResponse> sendAttachments(List<IncomingFileGroup> attachments) { - return attachments.stream() + void sendAttachments(List<IncomingFileGroup> attachments) { + attachments.stream() .flatMap(attachment -> { var groupName = attachment.getName(); - return attachment.getFiles().stream().map(file -> getSendAttachmentFileFunction(groupName, file)); + return attachment.getFiles().stream().map(file -> new FileInGroup(groupName, file)); }) - .reduce( - CompletableFuture.completedFuture(GrpcRouteForwardingResponse.newBuilder().build()), - CompletableFuture::thenCompose, - (f1, f2) -> f1.thenCompose(ignored -> f2) - ); + .forEach(this::sendAttachmentFile); } - private Function<GrpcRouteForwardingResponse, CompletableFuture<GrpcRouteForwardingResponse>> getSendAttachmentFileFunction(String groupName, - IncomingFile file) { - return ignored -> sendAttachmentFile(groupName, file); + void sendAttachmentFile(FileInGroup fileInGroup) { + var fileContentStream = fileService.getUploadedFileStream(fileInGroup.file.getId()); + var fileSender = createAttachmentFileSender(fileInGroup.groupName, fileInGroup.file, fileContentStream).send(); + waitForCompletion(fileSender, fileContentStream); } - CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile(String groupName, IncomingFile file) { - var fileContentStream = fileService.getUploadedFileStream(file.getId()); - var future = createAttachmentFileSender(groupName, file, fileContentStream).send().getResultFuture(); - return configureToCancelIfForwardFutureCompleted(future); + record FileInGroup(String groupName, IncomingFile file) { } StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName, @@ -162,25 +148,14 @@ class EingangForwarder { .build(); } - CompletableFuture<GrpcRouteForwardingResponse> sendRepresentations(List<IncomingFile> representations) { - return representations.stream() - .map(this::getSendRepresentationFileFunction) - .reduce( - CompletableFuture.completedFuture(GrpcRouteForwardingResponse.newBuilder().build()), - CompletableFuture::thenCompose, - (f1, f2) -> f1.thenCompose(ignored -> f2) - ); - } - - private Function<GrpcRouteForwardingResponse, CompletableFuture<GrpcRouteForwardingResponse>> getSendRepresentationFileFunction( - IncomingFile file) { - return ignored -> sendRepresentationFile(file); + void sendRepresentations(List<IncomingFile> representations) { + representations.forEach(this::sendRepresentationFile); } - CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile(IncomingFile file) { + void sendRepresentationFile(IncomingFile file) { var fileContentStream = fileService.getUploadedFileStream(file.getId()); - var future = createRepresentationFileSender(file, fileContentStream).send().getResultFuture(); - return configureToCancelIfForwardFutureCompleted(future); + var fileSender = createRepresentationFileSender(file, fileContentStream).send(); + waitForCompletion(fileSender, fileContentStream); } StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file, @@ -204,15 +179,6 @@ class EingangForwarder { .build(); } - CompletableFuture<GrpcRouteForwardingResponse> configureToCancelIfForwardFutureCompleted(CompletableFuture<GrpcRouteForwardingResponse> future) { - forwardFuture.whenComplete((result, ex) -> { - if (forwardFuture.isDone() && !future.isDone()) { - future.cancel(true); - } - }); - return future; - } - GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) { var fileContentBuilder = GrpcFileContent.newBuilder(); if (length <= 0) { @@ -229,6 +195,37 @@ class EingangForwarder { responseObserver::registerOnReadyHandler); } + <T> void waitForCompletion(Future<T> responseFuture) { + try { + responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TechnicalException("Waiting for finishing file upload was interrupted.", e); + } catch (ExecutionException e) { + throw new TechnicalException("Error on uploading file content.", e); + } catch (TimeoutException e) { + throw new TechnicalException("Timeout on uploading file content.", e); + } + } + + void waitForCompletion(StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender, InputStream fileContentStream) { + try { + fileSender.getResultFuture().get(TIMEOUT_MINUTES, TimeUnit.MINUTES); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fileSender.cancelOnError(e); + throw new TechnicalException("Waiting for finishing upload was interrupted.", e); + } catch (ExecutionException e) { + fileSender.cancelOnError(e); + throw new TechnicalException("Error on uploading file content.", e); + } catch (TimeoutException e) { + fileSender.cancelOnTimeout(); + throw new TechnicalException("Timeout on uploading data.", e); + } finally { + IOUtils.closeQuietly(fileContentStream); + } + } + @RequiredArgsConstructor static class ForwardingResponseObserver implements ClientResponseObserver<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> { private final CompletableFuture<GrpcRouteForwardingResponse> future; diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java index 387491c0a94d7ce33e716cfa393deb11e8fa2f3b..aac13d6d1df7de71581638c430c085bc68b8ff6b 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java @@ -23,15 +23,9 @@ */ package de.ozgcloud.vorgang.vorgang.redirect; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import org.springframework.beans.factory.annotation.Lookup; import org.springframework.stereotype.Service; -import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.vorgang.vorgang.VorgangService; import lombok.RequiredArgsConstructor; @@ -39,36 +33,17 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor class ForwardingRemoteService { - static final int TIMEOUT_MINUTES = 10; private final VorgangService vorgangService; private final ForwardingRequestMapper forwardingRequestMapper; public void forward(ForwardingRequest request) { var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst(); var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang); - var responseFuture = getEingangForwarder().forward(grpcRouteForwarding, eingang.getAttachments(), - eingang.getRepresentations()).getForwardFuture(); - waitForCompletion(responseFuture); + getEingangForwarder().forward(grpcRouteForwarding, eingang.getAttachments(), eingang.getRepresentations()); } @Lookup EingangForwarder getEingangForwarder() { return null; // provided by Spring } - - <T> void waitForCompletion(CompletableFuture<T> responseFuture) { - try { - responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - responseFuture.completeExceptionally(e); - throw new TechnicalException("Waiting for finishing file upload was interrupted.", e); - } catch (ExecutionException e) { - responseFuture.completeExceptionally(e); - throw new TechnicalException("Error on uploading file content.", e); - } catch (TimeoutException e) { - responseFuture.completeExceptionally(e); - throw new TechnicalException("Timeout on uploading file content.", e); - } - } } diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java index bddca9e4e372fc4943b903c06ebfcd917398d299..9681f8c6f81a8bf00528eef53c09192095d8918b 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java @@ -25,18 +25,21 @@ package de.ozgcloud.vorgang.vorgang.redirect; import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.eq; import java.io.InputStream; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -56,13 +59,13 @@ import org.mockito.Spy; import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils; import de.ozgcloud.common.binaryfile.StreamingFileSender; +import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.common.test.ReflectionTestUtils; import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest; import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse; import de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor; -import de.ozgcloud.vorgang.files.FileId; import de.ozgcloud.vorgang.files.FileService; import de.ozgcloud.vorgang.vorgang.IncomingFile; import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; @@ -72,6 +75,7 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileTestFactory; import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.DelegatingOnReadyHandler; import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.ForwardingResponseObserver; import io.grpc.stub.ClientCallStreamObserver; +import lombok.SneakyThrows; class EingangForwarderTest { @@ -88,83 +92,60 @@ class EingangForwarderTest { @Nested class TestForward { - @Mock - private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; @Mock private GrpcRouteForwarding grpcRouteForwarding; private final List<IncomingFileGroup> attachments = List.of(IncomingFileGroupTestFactory.create()); private final List<IncomingFile> representations = List.of(IncomingFileTestFactory.create()); + @Mock + private Future<GrpcRouteForwardingResponse> future; @BeforeEach void init() { - setRequestObserverInForwarder(requestObserver); + doReturn(future).when(forwarder).performGrpcCall(); + doNothing().when(forwarder).sendRouteForwarding(any()); + doNothing().when(forwarder).sendAttachments(any()); + doNothing().when(forwarder).sendRepresentations(any()); + doNothing().when(forwarder).waitForCompletion(any()); } @Test - void shouldCallOnCompletedOnSuccess() { - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).callService(); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).sendRouteForwarding(grpcRouteForwarding); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).sendAttachments(attachments); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).sendRepresentations(representations); + void shouldPerformGrpcCall() { + forwarder.forward(grpcRouteForwarding, attachments, representations); - CompletableFuture<Void> future = forwarder.forward(grpcRouteForwarding, attachments, representations).getForwardFuture(); - - assertOnCompletedCalled(future); + verify(forwarder).performGrpcCall(); } @Test - void shouldCallOnErrorOnFailureInRouteForwarding() { - var error = new RuntimeException("Route forwarding failed"); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).callService(); - doReturn(CompletableFuture.failedFuture(error)).when(forwarder).sendRouteForwarding(grpcRouteForwarding); - - var future = forwarder.forward(grpcRouteForwarding, attachments, representations).getForwardFuture(); + void shouldSendRouteForwarding() { + forwarder.forward(grpcRouteForwarding, attachments, representations); - assertOnErrorCalled(future, error); + verify(forwarder).sendRouteForwarding(grpcRouteForwarding); } @Test - void shouldCallOnErrorOnFailureInSendAttachments() { - var error = new RuntimeException("Send attachments failed"); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).callService(); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).sendRouteForwarding(grpcRouteForwarding); - doReturn(CompletableFuture.failedFuture(error)).when(forwarder).sendAttachments(attachments); + void shouldSendAttachments() { + forwarder.forward(grpcRouteForwarding, attachments, representations); - var future = forwarder.forward(grpcRouteForwarding, attachments, representations).getForwardFuture(); - - assertOnErrorCalled(future, error); + verify(forwarder).sendAttachments(attachments); } @Test - void shouldCallOnErrorOnFailureInSendRepresentations() { - var error = new RuntimeException("Send representations failed"); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).callService(); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).sendRouteForwarding(grpcRouteForwarding); - doReturn(CompletableFuture.completedFuture(null)).when(forwarder).sendAttachments(attachments); - doReturn(CompletableFuture.failedFuture(error)).when(forwarder).sendRepresentations(representations); - - var future = forwarder.forward(grpcRouteForwarding, attachments, representations).getForwardFuture(); + void shouldSendRepresentations() { + forwarder.forward(grpcRouteForwarding, attachments, representations); - assertOnErrorCalled(future, error); + verify(forwarder).sendRepresentations(representations); } - private void assertOnCompletedCalled(CompletableFuture<Void> future) { - future.join(); - verify(requestObserver).onCompleted(); - verify(requestObserver, never()).onError(any()); - } + @Test + void shouldWaitForCompletion() { + forwarder.forward(grpcRouteForwarding, attachments, representations); - private void assertOnErrorCalled(CompletableFuture<Void> future, Throwable error) { - future.handle((result, ex) -> { - verify(requestObserver).onError(argThat(e -> e instanceof CompletionException && e.getCause() == error)); - verify(requestObserver, never()).onCompleted(); - return null; - }).join(); + verify(forwarder).waitForCompletion(future); } } @Nested - class TestCallService { + class TestPerformGrpcCall { @BeforeEach void init() { @@ -173,21 +154,21 @@ class EingangForwarderTest { @Test void shouldAttachClientCallContextToServiceStub() { - forwarder.callService(); + forwarder.performGrpcCall(); verify(serviceStub).withInterceptors(any(VorgangManagerClientCallContextAttachingInterceptor.class)); } @Test void shouldCreateResponseObserver() { - forwarder.callService(); + forwarder.performGrpcCall(); assertThat(getResponseObserverFromForwarder()).isNotNull(); } @Test void shouldMakeGrpcCallToRouteForwarding() { - forwarder.callService(); + forwarder.performGrpcCall(); verify(serviceStub).routeForwarding(getResponseObserverFromForwarder()); } @@ -203,18 +184,22 @@ class EingangForwarderTest { private Runnable onReadyHandler; @Captor private ArgumentCaptor<Runnable> onReadyHandlerCaptor; + @Captor + private ArgumentCaptor<CompletableFuture<Void>> futureCaptor; @BeforeEach void init() { setResponseObserverInForwarder(responseObserver); doReturn(onReadyHandler).when(forwarder).getSendRouteForwardingRunnable(any(), any()); + doNothing().when(forwarder).waitForCompletion(any()); } + @SuppressWarnings("unchecked") @Test void shouldGetSendRouteForwardingRunnable() { - var future = forwarder.sendRouteForwarding(grpcRouteForwarding); + forwarder.sendRouteForwarding(grpcRouteForwarding); - verify(forwarder).getSendRouteForwardingRunnable(grpcRouteForwarding, future); + verify(forwarder).getSendRouteForwardingRunnable(eq(grpcRouteForwarding), any(CompletableFuture.class)); } @Test @@ -222,10 +207,27 @@ class EingangForwarderTest { forwarder.sendRouteForwarding(grpcRouteForwarding); verify(responseObserver).registerOnReadyHandler(onReadyHandlerCaptor.capture()); - assertThatIsResultOfGetSendRouteForwardingRunnable(onReadyHandlerCaptor.getValue()); + assertIsSendRouteForwardingRunnable(onReadyHandlerCaptor.getValue()); + } + + @SuppressWarnings("unchecked") + @Test + void shouldWaitForCompletion() { + forwarder.sendRouteForwarding(grpcRouteForwarding); + + verify(forwarder).waitForCompletion(any(CompletableFuture.class)); + } + + @Test + void shouldBeTheSameFuture() { + forwarder.sendRouteForwarding(grpcRouteForwarding); + + verify(forwarder).getSendRouteForwardingRunnable(any(), futureCaptor.capture()); + verify(forwarder).waitForCompletion(futureCaptor.getValue()); + assertThat(futureCaptor.getAllValues().getFirst()).isSameAs(futureCaptor.getValue()); } - private void assertThatIsResultOfGetSendRouteForwardingRunnable(Runnable runnable) { + private void assertIsSendRouteForwardingRunnable(Runnable runnable) { runnable.run(); verify(onReadyHandler).run(); } @@ -238,7 +240,7 @@ class EingangForwarderTest { @Mock private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver; @Mock - private CompletableFuture<GrpcRouteForwardingResponse> future; + private CompletableFuture<Void> future; @BeforeEach void init() { @@ -253,10 +255,10 @@ class EingangForwarderTest { } @Test - void shouldCallOnComplete() { + void shouldCompleteFuture() { forwarder.getSendRouteForwardingRunnable(grpcRouteForwarding, future).run(); - verify(future).complete(GrpcRouteForwardingResponse.newBuilder().build()); + verify(future).complete(null); } } @@ -264,63 +266,26 @@ class EingangForwarderTest { class TestSendAttachments { private final List<IncomingFileGroup> attachments = List.of(IncomingFileGroupTestFactory.createWithTwoFiles()); - private final CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>(); - private final CompletableFuture<GrpcRouteForwardingResponse> future2 = new CompletableFuture<>(); @BeforeEach void init() { - doReturn(future, future2).when(forwarder).sendAttachmentFile(any(), any()); + doNothing().when(forwarder).sendAttachmentFile(any()); } @Test - void shouldReturnFuture() { - var returned = forwarder.sendAttachments(attachments); - - assertThat(returned).isNotNull(); - } - - @Test - void shouldInitiallySendOnlyFirstFile() { + void shouldSendFirstAttachmentFile() { forwarder.sendAttachments(attachments); - verify(forwarder).sendAttachmentFile(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE); - verify(forwarder, times(1)).sendAttachmentFile(anyString(), any()); + verify(forwarder).sendAttachmentFile( + new EingangForwarder.FileInGroup(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE)); } @Test - void shouldSendSecondFileAfterFirstFutureCompleted() { + void shouldSendSecondAttachmentFile() { forwarder.sendAttachments(attachments); - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); - - verify(forwarder).sendAttachmentFile(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE2); - verify(forwarder, times(2)).sendAttachmentFile(anyString(), any()); - } - - @Test - void shouldReturnedFutureBeInitiallyIncomplete() { - var returned = forwarder.sendAttachments(attachments); - - assertThat(returned.isDone()).isFalse(); - } - - @Test - void shouldReturnedFutureBeIncompleteAfterSendingFirstFile() { - var returned = forwarder.sendAttachments(attachments); - - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); - - assertThat(returned.isDone()).isFalse(); - } - - @Test - void shouldReturnedFutureBeDoneAfterSendingAllFiles() { - var returned = forwarder.sendAttachments(attachments); - - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); - future2.complete(GrpcRouteForwardingResponse.newBuilder().build()); - - assertThat(returned.isDone()).isTrue(); + verify(forwarder).sendAttachmentFile( + new EingangForwarder.FileInGroup(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE2)); } } @@ -329,7 +294,6 @@ class EingangForwarderTest { @Mock private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - private final CompletableFuture<GrpcRouteForwardingResponse> resultFuture = new CompletableFuture<>(); @Mock private InputStream fileContentStream; @@ -338,8 +302,7 @@ class EingangForwarderTest { when(fileService.getUploadedFileStream(any())).thenReturn(fileContentStream); doReturn(fileSender).when(forwarder).createAttachmentFileSender(any(), any(), any()); doReturn(fileSender).when(fileSender).send(); - when(fileSender.getResultFuture()).thenReturn(resultFuture); - doReturn(resultFuture).when(forwarder).configureToCancelIfForwardFutureCompleted(any()); + doNothing().when(forwarder).waitForCompletion(any(), any()); } @Test @@ -364,21 +327,14 @@ class EingangForwarderTest { } @Test - void shouldConfigureFutureToCancelIfForwardFutureCompleted() { + void shouldWaitForCompletion() { sendAttachmentFile(); - verify(forwarder).configureToCancelIfForwardFutureCompleted(resultFuture); + verify(forwarder).waitForCompletion(fileSender, fileContentStream); } - @Test - void shouldReturnResultFuture() { - var returned = sendAttachmentFile(); - - assertThat(returned).isSameAs(resultFuture); - } - - private CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile() { - return forwarder.sendAttachmentFile(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE); + private void sendAttachmentFile() { + forwarder.sendAttachmentFile(new EingangForwarder.FileInGroup(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE)); } } @@ -388,9 +344,9 @@ class EingangForwarderTest { @Mock private InputStream inputStream; @Mock - private GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; @Mock - private GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; @Captor private ArgumentCaptor<BiFunction<byte[], Integer, GrpcRouteForwardingRequest>> chunkBuilderCaptor; @@ -405,7 +361,7 @@ class EingangForwarderTest { } @Test - void shouldCallCreateSenderWithoutMetadata() { + void shouldCreateSenderWithoutMetadata() { createAttachmentFileSender(); verify(forwarder).createSenderWithoutMetadata(chunkBuilderCaptor.capture(), eq(inputStream)); @@ -414,7 +370,7 @@ class EingangForwarderTest { } @Test - void shouldCallBuildGrpcAttachmentFile() { + void shouldBuildGrpcAttachmentFile() { createAttachmentFileSender(); verify(forwarder).buildGrpcAttachmentFile(IncomingFileGroupTestFactory.NAME, IncomingFileGroupTestFactory.FILE); @@ -493,65 +449,17 @@ class EingangForwarderTest { class TestSendRepresentations { private static final IncomingFile FILE = IncomingFileTestFactory.create(); - private static final IncomingFile FILE2 = IncomingFileTestFactory.createBuilder().id(FileId.createNew()).build(); - private final List<IncomingFile> representations = List.of(FILE, FILE2); - private final CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>(); - private final CompletableFuture<GrpcRouteForwardingResponse> future2 = new CompletableFuture<>(); @BeforeEach void init() { - doReturn(future, future2).when(forwarder).sendRepresentationFile(any()); - } - - @Test - void shouldReturnFuture() { - var returned = forwarder.sendRepresentations(representations); - - assertThat(returned).isNotNull(); + doNothing().when(forwarder).sendRepresentationFile(any()); } @Test - void shouldInitiallySendOnlyFirstFile() { - forwarder.sendRepresentations(representations); + void shouldSendRepresentationFile() { + forwarder.sendRepresentations(List.of(FILE)); verify(forwarder).sendRepresentationFile(FILE); - verify(forwarder, times(1)).sendRepresentationFile(any()); - } - - @Test - void shouldSendSecondFileAfterFirstFutureCompleted() { - forwarder.sendRepresentations(representations); - - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); - - verify(forwarder).sendRepresentationFile(FILE2); - verify(forwarder, times(2)).sendRepresentationFile(any()); - } - - @Test - void shouldReturnedFutureBeInitiallyIncomplete() { - var returned = forwarder.sendRepresentations(representations); - - assertThat(returned.isDone()).isFalse(); - } - - @Test - void shouldReturnedFutureBeIncompleteAfterSendingFirstFile() { - var returned = forwarder.sendRepresentations(representations); - - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); - - assertThat(returned.isDone()).isFalse(); - } - - @Test - void shouldReturnedFutureBeDoneAfterSendingAllFiles() { - var returned = forwarder.sendRepresentations(representations); - - future.complete(GrpcRouteForwardingResponse.newBuilder().build()); - future2.complete(GrpcRouteForwardingResponse.newBuilder().build()); - - assertThat(returned.isDone()).isTrue(); } } @@ -560,7 +468,6 @@ class EingangForwarderTest { @Mock private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; - private final CompletableFuture<GrpcRouteForwardingResponse> resultFuture = new CompletableFuture<>(); @Mock private InputStream fileContentStream; @@ -571,8 +478,7 @@ class EingangForwarderTest { when(fileService.getUploadedFileStream(any())).thenReturn(fileContentStream); doReturn(fileSender).when(forwarder).createRepresentationFileSender(any(), any()); doReturn(fileSender).when(fileSender).send(); - when(fileSender.getResultFuture()).thenReturn(resultFuture); - doReturn(resultFuture).when(forwarder).configureToCancelIfForwardFutureCompleted(any()); + doNothing().when(forwarder).waitForCompletion(any(), any()); } @Test @@ -597,21 +503,14 @@ class EingangForwarderTest { } @Test - void shouldConfigureFutureToCancelIfForwardFutureCompleted() { + void shouldWaitForCompletion() { sendRepresentationFile(); - verify(forwarder).configureToCancelIfForwardFutureCompleted(resultFuture); - } - - @Test - void shouldReturnResultFuture() { - var returned = sendRepresentationFile(); - - assertThat(returned).isSameAs(resultFuture); + verify(forwarder).waitForCompletion(fileSender, fileContentStream); } - private CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile() { - return forwarder.sendRepresentationFile(file); + private void sendRepresentationFile() { + forwarder.sendRepresentationFile(file); } } @@ -621,9 +520,9 @@ class EingangForwarderTest { @Mock private InputStream inputStream; @Mock - private GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; @Mock - private GrpcFileUploadUtils.FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSenderWithMetadata; @Captor private ArgumentCaptor<BiFunction<byte[], Integer, GrpcRouteForwardingRequest>> chunkBuilderCaptor; @@ -723,55 +622,6 @@ class EingangForwarderTest { } } - @Nested - class TestConfigureToCancelIfForwardFutureCompleted { - - private final CompletableFuture<Void> forwardFuture = new CompletableFuture<>(); - private final CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>(); - - @BeforeEach - void init() { - setForwardFutureInForwarder(forwardFuture); - } - - @Test - void shouldCancelFutureWhenForwardFutureCompleted() { - forwarder.configureToCancelIfForwardFutureCompleted(future); - - forwardFuture.complete(null); - - assertThat(future.isCancelled()).isTrue(); - } - - @Test - void shouldCancelFutureWhenForwardFutureWasCancelled() { - forwarder.configureToCancelIfForwardFutureCompleted(future); - - forwardFuture.cancel(true); - - assertThat(future.isCancelled()).isTrue(); - } - - @Test - void shouldCancelFutureWhenForwardFutureCompletedExceptionally() { - forwarder.configureToCancelIfForwardFutureCompleted(future); - - forwardFuture.completeExceptionally(new RuntimeException("Forced failure")); - - assertThat(future.isCancelled()).isTrue(); - } - - @Test - void shouldNotCancelFutureIfItIsDone() { - forwarder.configureToCancelIfForwardFutureCompleted(future); - future.complete(GrpcRouteForwardingResponse.getDefaultInstance()); - - forwardFuture.complete(null); - - assertThat(future.isCancelled()).isFalse(); - } - } - @Nested class TestBuildGrpcFileContent { @@ -858,6 +708,246 @@ class EingangForwarderTest { } } + @Nested + class TestWaitForCompletionOfFuture { + + @Mock + private CompletableFuture<Void> future; + + @SneakyThrows + @Test + void shouldGetFromFuture() { + waitForCompletion(); + + verify(future).get(EingangForwarder.TIMEOUT_MINUTES, TimeUnit.MINUTES); + } + + @Nested + class TestOnInterruptedException { + + private final InterruptedException exception = new InterruptedException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFuture.this::waitForCompletion); + } + + @Test + void shouldInterruptThread() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + } + + @Nested + class TestOnExecutionException { + + private final ExecutionException exception = new ExecutionException(new Exception()); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFuture.this::waitForCompletion); + } + } + + @Nested + class TestOnTimeoutException { + + private final TimeoutException exception = new TimeoutException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFuture.this::waitForCompletion); + } + } + + private void waitForCompletion() { + forwarder.waitForCompletion(future); + } + } + + @Nested + class TestWaitForCompletionOfFileSender { + + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + @Mock + private InputStream fileContentStream; + @Mock + private CompletableFuture<GrpcRouteForwardingResponse> future; + + @BeforeEach + void init() { + when(fileSender.getResultFuture()).thenReturn(future); + } + + @SneakyThrows + @Test + void shouldGetFromFuture() { + waitForCompletion(); + + verify(future).get(EingangForwarder.TIMEOUT_MINUTES, TimeUnit.MINUTES); + } + + @Nested + class TestOnInterruptedException { + + private final InterruptedException exception = new InterruptedException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldInterruptThread() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + + @Test + void shouldCancelOnError() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileSender).cancelOnError(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFileSender.this::waitForCompletion); + } + + @SneakyThrows + @Test + void shouldCloseStream() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileContentStream).close(); + } + } + + @Nested + class TestOnExecutionException { + + private final ExecutionException exception = new ExecutionException(new Exception()); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldCancelOnError() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileSender).cancelOnError(exception); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFileSender.this::waitForCompletion); + } + + @SneakyThrows + @Test + void shouldCloseStream() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileContentStream).close(); + } + } + + @Nested + class TestOnTimeoutException { + + private final TimeoutException exception = new TimeoutException(); + + @BeforeEach + @SneakyThrows + void mock() { + when(future.get(anyLong(), any())).thenThrow(exception); + } + + @Test + void shouldCancelOnTimeout() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileSender).cancelOnTimeout(); + } + + @Test + void shouldThrowTechnicalException() { + assertThrows(TechnicalException.class, TestWaitForCompletionOfFileSender.this::waitForCompletion); + } + + @SneakyThrows + @Test + void shouldCloseStream() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(fileContentStream).close(); + } + } + + private void waitForCompletion() { + forwarder.waitForCompletion(fileSender, fileContentStream); + } + } + @Nested class TestForwardingResponseObserver { diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java index 5ddb68d303c33cc5700ccdcf0fa608dbb34b5c63..2a1c5ea86879039b47fcd9c769a6e8b82d4dcf63 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java @@ -23,16 +23,10 @@ */ package de.ozgcloud.vorgang.vorgang.redirect; -import static org.assertj.core.api.Assertions.*; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -41,12 +35,10 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Spy; -import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; import de.ozgcloud.vorgang.vorgang.EingangTestFactory; import de.ozgcloud.vorgang.vorgang.VorgangService; import de.ozgcloud.vorgang.vorgang.VorgangTestFactory; -import lombok.SneakyThrows; class ForwardingRemoteServiceTest { @@ -63,7 +55,6 @@ class ForwardingRemoteServiceTest { private final ForwardingRequest request = ForwardingRequestTestFactory.create(); private final GrpcRouteForwarding grpcRouteForwarding = GrpcRouteForwardingTestFactory.create(); - private final CompletableFuture<Void> responseFuture = new CompletableFuture<>(); @Mock private EingangForwarder eingangForwarder; @@ -71,10 +62,7 @@ class ForwardingRemoteServiceTest { void init() { doReturn(eingangForwarder).when(service).getEingangForwarder(); when(vorgangService.getById(any())).thenReturn(VorgangTestFactory.create()); - when(eingangForwarder.forward(any(), any(), any())).thenReturn(eingangForwarder); - when(eingangForwarder.getForwardFuture()).thenReturn(responseFuture); when(forwardingRequestMapper.toGrpcRouteForwarding(any(), any())).thenReturn(grpcRouteForwarding); - doNothing().when(service).waitForCompletion(any()); } @Test @@ -97,126 +85,5 @@ class ForwardingRemoteServiceTest { verify(eingangForwarder).forward(grpcRouteForwarding, List.of(EingangTestFactory.ATTACHMENT), List.of(EingangTestFactory.REPRESENTATION)); } - - @Test - void shouldWaitForCompletion() { - service.forward(request); - - verify(service).waitForCompletion(responseFuture); - } - } - - @Nested - class TestWaitForCompletion { - - @Mock - private CompletableFuture<Void> future; - - @SneakyThrows - @Test - void shouldGetFromFuture() { - waitForCompletion(); - - verify(future).get(ForwardingRemoteService.TIMEOUT_MINUTES, TimeUnit.MINUTES); - } - - @Nested - class TestOnInterruptedException { - - private final InterruptedException exception = new InterruptedException(); - - @BeforeEach - @SneakyThrows - void mock() { - when(future.get(anyLong(), any())).thenThrow(exception); - } - - @Test - void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); - } - - @Test - void shouldInterruptThread() { - try { - waitForCompletion(); - } catch (TechnicalException e) { - // expected - } - - assertThat(Thread.currentThread().isInterrupted()).isTrue(); - } - - @Test - void shouldCompleteFutureExceptionally() { - try { - waitForCompletion(); - } catch (TechnicalException e) { - // expected - } - - verify(future).completeExceptionally(exception); - } - } - - @Nested - class TestOnExecutionException { - - private final ExecutionException exception = new ExecutionException(new Exception()); - - @BeforeEach - @SneakyThrows - void mock() { - when(future.get(anyLong(), any())).thenThrow(exception); - } - - @Test - void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); - } - - @Test - void shouldCompleteFutureExceptionally() { - try { - waitForCompletion(); - } catch (TechnicalException e) { - // expected - } - - verify(future).completeExceptionally(exception); - } - } - - @Nested - class TestOnTimeoutException { - - private final TimeoutException exception = new TimeoutException(); - - @BeforeEach - @SneakyThrows - void mock() { - when(future.get(anyLong(), any())).thenThrow(exception); - } - - @Test - void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); - } - - @Test - void shouldCompleteFutureExceptionally() { - try { - waitForCompletion(); - } catch (TechnicalException e) { - // expected - } - - verify(future).completeExceptionally(exception); - } - } - - private void waitForCompletion() { - service.waitForCompletion(future); - } } }