diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java index 5fa1cc80c6f40e94cd5ebfcbe29cace8d6962564..f34f7e0bad6e748a03bfaef071e3bdb5f3d8ed52 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java @@ -26,10 +26,11 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; +import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; -@RequiredArgsConstructor +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) class EingangForwarder { private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; @@ -43,6 +44,11 @@ class EingangForwarder { @Getter private CompletableFuture<Void> forwardFuture; + public static EingangForwarder create(RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub, FileService fileService, + IncomingFileMapper incomingFileMapper) { + return new EingangForwarder(serviceStub, fileService, incomingFileMapper); + } + public EingangForwarder forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) { @@ -104,10 +110,8 @@ class EingangForwarder { CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile(String groupName, IncomingFile file) { var fileContentStream = fileService.getUploadedFileStream(file.getId()); - var sender = createAttachmentFileSender(groupName, file, fileContentStream).send(); - var future = sender.getResultFuture(); - configureToCancelIfForwardFutureCompleted(future); - return future; + var future = createAttachmentFileSender(groupName, file, fileContentStream).send().getResultFuture(); + return configureToCancelIfForwardFutureCompleted(future); } StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName, @@ -149,10 +153,8 @@ class EingangForwarder { CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile(IncomingFile file) { var fileContentStream = fileService.getUploadedFileStream(file.getId()); - var sender = createRepresentationFileSender(file, fileContentStream).send(); - var future = sender.getResultFuture(); - configureToCancelIfForwardFutureCompleted(future); - return future; + var future = createRepresentationFileSender(file, fileContentStream).send().getResultFuture(); + return configureToCancelIfForwardFutureCompleted(future); } StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file, @@ -176,12 +178,13 @@ class EingangForwarder { .build(); } - void configureToCancelIfForwardFutureCompleted(CompletableFuture<GrpcRouteForwardingResponse> future) { + CompletableFuture<GrpcRouteForwardingResponse> configureToCancelIfForwardFutureCompleted(CompletableFuture<GrpcRouteForwardingResponse> future) { forwardFuture.whenComplete((result, ex) -> { if (forwardFuture.isDone() && !future.isDone()) { future.cancel(true); } }); + return future; } GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) { @@ -251,11 +254,9 @@ class EingangForwarder { @Override public void run() { - while (!done.get() && requestStream.isReady()) { - var runnable = onReadyHandler.get(); - if (runnable != null) { - runnable.run(); - } + var delegate = onReadyHandler.get(); + if (delegate != null && !done.get() && requestStream.isReady()) { + delegate.run(); } } } diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java index b76dde08cd2cf42fd85b5d7dbd830c8f9aec35b0..eef4e2ca0fddea30b695305ca86c6f07ff2107bb 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteService.java @@ -53,7 +53,7 @@ class ForwardingRemoteService { public void forward(ForwardingRequest request) { var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst(); var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang); - var responseFuture = new EingangForwarder(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(), + var responseFuture = EingangForwarder.create(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(), eingang.getRepresentations()).getForwardFuture(); waitForCompletion(responseFuture); } @@ -63,10 +63,13 @@ class ForwardingRemoteService { responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + responseFuture.completeExceptionally(e); throw new TechnicalException("Waiting for finishing file upload was interrupted.", e); } catch (ExecutionException e) { + responseFuture.completeExceptionally(e); throw new TechnicalException("Error on uploading file content.", e); } catch (TimeoutException e) { + responseFuture.completeExceptionally(e); throw new TechnicalException("Timeout on uploading file content.", e); } } diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java index 78f1837c3b657bd7221ba2e2c1a6d0a493d3fb2a..0361376b08134a9f86d13a081ba233c130c477dc 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java @@ -10,6 +10,7 @@ import java.io.InputStream; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -40,6 +41,7 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup; import de.ozgcloud.vorgang.vorgang.IncomingFileGroupTestFactory; import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; import de.ozgcloud.vorgang.vorgang.IncomingFileTestFactory; +import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.DelegatingOnReadyHandler; import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.ForwardingResponseObserver; import io.grpc.stub.ClientCallStreamObserver; @@ -309,7 +311,7 @@ class EingangForwarderTest { doReturn(fileSender).when(forwarder).createAttachmentFileSender(any(), any(), any()); doReturn(fileSender).when(fileSender).send(); when(fileSender.getResultFuture()).thenReturn(resultFuture); - doNothing().when(forwarder).configureToCancelIfForwardFutureCompleted(any()); + doReturn(resultFuture).when(forwarder).configureToCancelIfForwardFutureCompleted(any()); } @Test @@ -479,6 +481,110 @@ class EingangForwarderTest { assertThat(returned).isNotNull(); } + + @Test + void shouldInitiallySendOnlyFirstFile() { + forwarder.sendRepresentations(representations); + + verify(forwarder).sendRepresentationFile(FILE); + verify(forwarder, times(1)).sendRepresentationFile(any()); + } + + @Test + void shouldSendSecondFileAfterFirstFutureCompleted() { + forwarder.sendRepresentations(representations); + + future.complete(GrpcRouteForwardingResponse.newBuilder().build()); + + verify(forwarder).sendRepresentationFile(FILE2); + verify(forwarder, times(2)).sendRepresentationFile(any()); + } + + @Test + void shouldReturnedFutureBeInitiallyIncomplete() { + var returned = forwarder.sendRepresentations(representations); + + assertThat(returned.isDone()).isFalse(); + } + + @Test + void shouldReturnedFutureBeIncompleteAfterSendingFirstFile() { + var returned = forwarder.sendRepresentations(representations); + + future.complete(GrpcRouteForwardingResponse.newBuilder().build()); + + assertThat(returned.isDone()).isFalse(); + } + + @Test + void shouldReturnedFutureBeDoneAfterSendingAllFiles() { + var returned = forwarder.sendRepresentations(representations); + + future.complete(GrpcRouteForwardingResponse.newBuilder().build()); + future2.complete(GrpcRouteForwardingResponse.newBuilder().build()); + + assertThat(returned.isDone()).isTrue(); + } + } + + @Nested + class TestSendRepresentationFile { + + @Mock + private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; + private final CompletableFuture<GrpcRouteForwardingResponse> resultFuture = new CompletableFuture<>(); + @Mock + private InputStream fileContentStream; + + private final IncomingFile file = IncomingFileTestFactory.create(); + + @BeforeEach + void init() { + when(fileService.getUploadedFileStream(any())).thenReturn(fileContentStream); + doReturn(fileSender).when(forwarder).createRepresentationFileSender(any(), any()); + doReturn(fileSender).when(fileSender).send(); + when(fileSender.getResultFuture()).thenReturn(resultFuture); + doReturn(resultFuture).when(forwarder).configureToCancelIfForwardFutureCompleted(any()); + } + + @Test + void shouldGetUploadFileStream() { + sendRepresentationFile(); + + verify(fileService).getUploadedFileStream(IncomingFileTestFactory.ID); + } + + @Test + void shouldCreateRepresentationFileSender() { + sendRepresentationFile(); + + verify(forwarder).createRepresentationFileSender(file, fileContentStream); + } + + @Test + void shouldSend() { + sendRepresentationFile(); + + verify(fileSender).send(); + } + + @Test + void shouldConfigureFutureToCancelIfForwardFutureCompleted() { + sendRepresentationFile(); + + verify(forwarder).configureToCancelIfForwardFutureCompleted(resultFuture); + } + + @Test + void shouldReturnResultFuture() { + var returned = sendRepresentationFile(); + + assertThat(returned).isSameAs(resultFuture); + } + + private CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile() { + return forwarder.sendRepresentationFile(file); + } } @Nested @@ -589,6 +695,55 @@ class EingangForwarderTest { } } + @Nested + class TestConfigureToCancelIfForwardFutureCompleted { + + private final CompletableFuture<Void> forwardFuture = new CompletableFuture<>(); + private final CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>(); + + @BeforeEach + void init() { + setForwardFutureInForwarder(forwardFuture); + } + + @Test + void shouldCancelFutureWhenForwardFutureCompleted() { + forwarder.configureToCancelIfForwardFutureCompleted(future); + + forwardFuture.complete(null); + + assertThat(future.isCancelled()).isTrue(); + } + + @Test + void shouldCancelFutureWhenForwardFutureWasCancelled() { + forwarder.configureToCancelIfForwardFutureCompleted(future); + + forwardFuture.cancel(true); + + assertThat(future.isCancelled()).isTrue(); + } + + @Test + void shouldCancelFutureWhenForwardFutureCompletedExceptionally() { + forwarder.configureToCancelIfForwardFutureCompleted(future); + + forwardFuture.completeExceptionally(new RuntimeException("Forced failure")); + + assertThat(future.isCancelled()).isTrue(); + } + + @Test + void shouldNotCancelFutureIfItIsDone() { + forwarder.configureToCancelIfForwardFutureCompleted(future); + future.complete(GrpcRouteForwardingResponse.getDefaultInstance()); + + forwardFuture.complete(null); + + assertThat(future.isCancelled()).isFalse(); + } + } + @Nested class TestBuildGrpcFileContent { @@ -675,6 +830,199 @@ class EingangForwarderTest { } } + @Nested + class TestForwardingResponseObserver { + + @Mock + private CompletableFuture<GrpcRouteForwardingResponse> future; + @Mock + private DelegatingOnReadyHandler onReadyHandler; + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream; + private final GrpcRouteForwardingResponse response = GrpcRouteForwardingResponse.getDefaultInstance(); + @InjectMocks + private ForwardingResponseObserver observer; + + @Nested + class TestBeforeStart { + + @Test + void shouldCreateOnReadyHandler() { + observer.beforeStart(requestStream); + + assertThat(getOnReadyHandlerFromObserver()).isNotNull(); + } + + @Test + void shouldSetOnReadyHandler() { + observer.beforeStart(requestStream); + + verify(requestStream).setOnReadyHandler(getOnReadyHandlerFromObserver()); + } + } + + @Nested + class TestOnNext { + + @Test + void shouldSetResponse() { + observer.onNext(response); + + assertThat(getResponseFromObserver()).isSameAs(response); + } + } + + @Nested + class TestOnError { + + private final Throwable error = new RuntimeException("Error when forwarding"); + + @BeforeEach + void init() { + setOnReadyHandlerInObserver(); + } + + @Test + void shouldStopOnReadyHandler() { + observer.onError(error); + + verify(onReadyHandler).stop(); + } + + @Test + void shouldCompleteFutureExceptionally() { + observer.onError(error); + + verify(future).completeExceptionally(error); + } + } + + @Nested + class TestOnCompleted { + + @BeforeEach + void init() { + setOnReadyHandlerInObserver(); + } + + @Test + void shouldStopOnReadyHandler() { + observer.onCompleted(); + + verify(onReadyHandler).stop(); + } + + @Test + void shouldCompleteFutureWithResponse() { + observer.onNext(response); + + observer.onCompleted(); + + verify(future).complete(response); + } + } + + @Nested + class TestRegisterOnReadyHandler { + + @Mock + private Runnable delegate; + + @BeforeEach + void init() { + setOnReadyHandlerInObserver(); + } + + @Test + void shouldSetDelegateInOnReadyHandler() { + observer.registerOnReadyHandler(delegate); + + verify(onReadyHandler).setDelegate(delegate); + } + } + + private DelegatingOnReadyHandler getOnReadyHandlerFromObserver() { + return ReflectionTestUtils.getField(observer, "onReadyHandler", DelegatingOnReadyHandler.class); + } + + private void setOnReadyHandlerInObserver() { + ReflectionTestUtils.setField(observer, "onReadyHandler", onReadyHandler); + } + + private GrpcRouteForwardingResponse getResponseFromObserver() { + return ReflectionTestUtils.getField(observer, "response", GrpcRouteForwardingResponse.class); + } + } + + @Nested + class TestDelegatingOnReadyHandler { + + @Mock + private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream; + @InjectMocks + private DelegatingOnReadyHandler onReadyHandler; + + @Test + void shouldDoneBeInitiallyFalse() { + assertThat(getDoneFromOnReadyHandler()).isFalse(); + } + + @Nested + class TestStop { + + @Test + void shouldSetDoneToTrue() { + onReadyHandler.stop(); + + assertThat(getDoneFromOnReadyHandler()).isTrue(); + } + } + + @Nested + class TestRun { + + @Mock + private Runnable delegate; + + @BeforeEach + void init() { + onReadyHandler.setDelegate(delegate); + } + + @Test + void shouldNotRunDelegateIfDone() { + onReadyHandler.stop(); + lenient().when(requestStream.isReady()).thenReturn(true); + + onReadyHandler.run(); + + verify(delegate, never()).run(); + } + + @Test + void shouldNotRunDelegateIfNotReady() { + when(requestStream.isReady()).thenReturn(false); + + onReadyHandler.run(); + + verify(delegate, never()).run(); + } + + @Test + void shouldRunDelegateIfNotDoneAndReady() { + when(requestStream.isReady()).thenReturn(true); + + onReadyHandler.run(); + + verify(delegate).run(); + } + } + + private boolean getDoneFromOnReadyHandler() { + return ReflectionTestUtils.getField(onReadyHandler, "done", AtomicBoolean.class).get(); + } + } + private void setResponseObserverInForwarder(ForwardingResponseObserver responseObserver) { ReflectionTestUtils.setField(forwarder, "responseObserver", responseObserver); } @@ -686,4 +1034,8 @@ class EingangForwarderTest { private void setRequestObserverInForwarder(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver) { ReflectionTestUtils.setField(forwarder, "requestObserver", requestObserver); } + + private void setForwardFutureInForwarder(CompletableFuture<Void> future) { + ReflectionTestUtils.setField(forwarder, "forwardFuture", future); + } } diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java index 3df3152b6a388d150b384d6cd918639d79f24f7d..e883ab78208d5158642011a6c7207a169409d5c5 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/ForwardingRemoteServiceTest.java @@ -5,23 +5,29 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Spy; import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc; +import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding; import de.ozgcloud.vorgang.files.FileService; +import de.ozgcloud.vorgang.vorgang.EingangTestFactory; import de.ozgcloud.vorgang.vorgang.IncomingFileMapper; import de.ozgcloud.vorgang.vorgang.VorgangService; +import de.ozgcloud.vorgang.vorgang.VorgangTestFactory; import lombok.SneakyThrows; class ForwardingRemoteServiceTest { @@ -40,6 +46,61 @@ class ForwardingRemoteServiceTest { @Spy private ForwardingRemoteService service; + @Nested + class TestForward { + + private final ForwardingRequest request = ForwardingRequestTestFactory.create(); + private final GrpcRouteForwarding grpcRouteForwarding = GrpcRouteForwardingTestFactory.create(); + private final CompletableFuture<Void> responseFuture = new CompletableFuture<>(); + private MockedStatic<EingangForwarder> eingangForwarderMockedStatic; + @Mock + private EingangForwarder eingangForwarder; + + @BeforeEach + void init() { + when(vorgangService.getById(any())).thenReturn(VorgangTestFactory.create()); + eingangForwarderMockedStatic = mockStatic(EingangForwarder.class); + eingangForwarderMockedStatic.when(() -> EingangForwarder.create(any(), any(), any())).thenReturn(eingangForwarder); + when(eingangForwarder.forward(any(), any(), any())).thenReturn(eingangForwarder); + when(eingangForwarder.getForwardFuture()).thenReturn(responseFuture); + when(forwardingRequestMapper.toGrpcRouteForwarding(any(), any())).thenReturn(grpcRouteForwarding); + doNothing().when(service).waitForCompletion(any()); + } + + @AfterEach + void teardown() { + eingangForwarderMockedStatic.close(); + } + + @Test + void shouldGetVorgang() { + service.forward(request); + + verify(vorgangService).getById(VorgangTestFactory.ID); + } + + @Test + void shouldMapToGrpcRouteForwarding() { + service.forward(request); + + verify(forwardingRequestMapper).toGrpcRouteForwarding(request, VorgangTestFactory.EINGANG); + } + + @Test + void shouldForward() { + service.forward(request); + + verify(eingangForwarder).forward(grpcRouteForwarding, List.of(EingangTestFactory.ATTACHMENT), List.of(EingangTestFactory.REPRESENTATION)); + } + + @Test + void shouldWaitForCompletion() { + service.forward(request); + + verify(service).waitForCompletion(responseFuture); + } + } + @Nested class TestWaitForCompletion { @@ -67,7 +128,7 @@ class ForwardingRemoteServiceTest { @Test void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion); + assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); } @Test @@ -80,6 +141,17 @@ class ForwardingRemoteServiceTest { assertThat(Thread.currentThread().isInterrupted()).isTrue(); } + + @Test + void shouldCompleteFutureExceptionally() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(future).completeExceptionally(exception); + } } @Nested @@ -95,7 +167,18 @@ class ForwardingRemoteServiceTest { @Test void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion); + assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); + } + + @Test + void shouldCompleteFutureExceptionally() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(future).completeExceptionally(exception); } } @@ -112,7 +195,18 @@ class ForwardingRemoteServiceTest { @Test void shouldThrowTechnicalException() { - assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion); + assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion); + } + + @Test + void shouldCompleteFutureExceptionally() { + try { + waitForCompletion(); + } catch (TechnicalException e) { + // expected + } + + verify(future).completeExceptionally(exception); } }