Skip to content
Snippets Groups Projects
Commit 30b20892 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 Complete future exceptionally on exception

parent d6a5e58a
No related branches found
No related tags found
1 merge request!27OZG-7573 Dateien Weiterleiten
......@@ -26,10 +26,11 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class EingangForwarder {
private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub;
......@@ -43,6 +44,11 @@ class EingangForwarder {
@Getter
private CompletableFuture<Void> forwardFuture;
public static EingangForwarder create(RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub, FileService fileService,
IncomingFileMapper incomingFileMapper) {
return new EingangForwarder(serviceStub, fileService, incomingFileMapper);
}
public EingangForwarder forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments,
List<IncomingFile> representations) {
......@@ -104,10 +110,8 @@ class EingangForwarder {
CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile(String groupName, IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createAttachmentFileSender(groupName, file, fileContentStream).send();
var future = sender.getResultFuture();
configureToCancelIfForwardFutureCompleted(future);
return future;
var future = createAttachmentFileSender(groupName, file, fileContentStream).send().getResultFuture();
return configureToCancelIfForwardFutureCompleted(future);
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName,
......@@ -149,10 +153,8 @@ class EingangForwarder {
CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile(IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createRepresentationFileSender(file, fileContentStream).send();
var future = sender.getResultFuture();
configureToCancelIfForwardFutureCompleted(future);
return future;
var future = createRepresentationFileSender(file, fileContentStream).send().getResultFuture();
return configureToCancelIfForwardFutureCompleted(future);
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file,
......@@ -176,12 +178,13 @@ class EingangForwarder {
.build();
}
void configureToCancelIfForwardFutureCompleted(CompletableFuture<GrpcRouteForwardingResponse> future) {
CompletableFuture<GrpcRouteForwardingResponse> configureToCancelIfForwardFutureCompleted(CompletableFuture<GrpcRouteForwardingResponse> future) {
forwardFuture.whenComplete((result, ex) -> {
if (forwardFuture.isDone() && !future.isDone()) {
future.cancel(true);
}
});
return future;
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
......@@ -251,11 +254,9 @@ class EingangForwarder {
@Override
public void run() {
while (!done.get() && requestStream.isReady()) {
var runnable = onReadyHandler.get();
if (runnable != null) {
runnable.run();
}
var delegate = onReadyHandler.get();
if (delegate != null && !done.get() && requestStream.isReady()) {
delegate.run();
}
}
}
......
......@@ -53,7 +53,7 @@ class ForwardingRemoteService {
public void forward(ForwardingRequest request) {
var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst();
var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang);
var responseFuture = new EingangForwarder(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(),
var responseFuture = EingangForwarder.create(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(),
eingang.getRepresentations()).getForwardFuture();
waitForCompletion(responseFuture);
}
......@@ -63,10 +63,13 @@ class ForwardingRemoteService {
responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
responseFuture.completeExceptionally(e);
throw new TechnicalException("Waiting for finishing file upload was interrupted.", e);
} catch (ExecutionException e) {
responseFuture.completeExceptionally(e);
throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) {
responseFuture.completeExceptionally(e);
throw new TechnicalException("Timeout on uploading file content.", e);
}
}
......
......@@ -10,6 +10,7 @@ import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
......@@ -40,6 +41,7 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileGroupTestFactory;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import de.ozgcloud.vorgang.vorgang.IncomingFileTestFactory;
import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.DelegatingOnReadyHandler;
import de.ozgcloud.vorgang.vorgang.redirect.EingangForwarder.ForwardingResponseObserver;
import io.grpc.stub.ClientCallStreamObserver;
......@@ -309,7 +311,7 @@ class EingangForwarderTest {
doReturn(fileSender).when(forwarder).createAttachmentFileSender(any(), any(), any());
doReturn(fileSender).when(fileSender).send();
when(fileSender.getResultFuture()).thenReturn(resultFuture);
doNothing().when(forwarder).configureToCancelIfForwardFutureCompleted(any());
doReturn(resultFuture).when(forwarder).configureToCancelIfForwardFutureCompleted(any());
}
@Test
......@@ -479,6 +481,110 @@ class EingangForwarderTest {
assertThat(returned).isNotNull();
}
@Test
void shouldInitiallySendOnlyFirstFile() {
forwarder.sendRepresentations(representations);
verify(forwarder).sendRepresentationFile(FILE);
verify(forwarder, times(1)).sendRepresentationFile(any());
}
@Test
void shouldSendSecondFileAfterFirstFutureCompleted() {
forwarder.sendRepresentations(representations);
future.complete(GrpcRouteForwardingResponse.newBuilder().build());
verify(forwarder).sendRepresentationFile(FILE2);
verify(forwarder, times(2)).sendRepresentationFile(any());
}
@Test
void shouldReturnedFutureBeInitiallyIncomplete() {
var returned = forwarder.sendRepresentations(representations);
assertThat(returned.isDone()).isFalse();
}
@Test
void shouldReturnedFutureBeIncompleteAfterSendingFirstFile() {
var returned = forwarder.sendRepresentations(representations);
future.complete(GrpcRouteForwardingResponse.newBuilder().build());
assertThat(returned.isDone()).isFalse();
}
@Test
void shouldReturnedFutureBeDoneAfterSendingAllFiles() {
var returned = forwarder.sendRepresentations(representations);
future.complete(GrpcRouteForwardingResponse.newBuilder().build());
future2.complete(GrpcRouteForwardingResponse.newBuilder().build());
assertThat(returned.isDone()).isTrue();
}
}
@Nested
class TestSendRepresentationFile {
@Mock
private StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender;
private final CompletableFuture<GrpcRouteForwardingResponse> resultFuture = new CompletableFuture<>();
@Mock
private InputStream fileContentStream;
private final IncomingFile file = IncomingFileTestFactory.create();
@BeforeEach
void init() {
when(fileService.getUploadedFileStream(any())).thenReturn(fileContentStream);
doReturn(fileSender).when(forwarder).createRepresentationFileSender(any(), any());
doReturn(fileSender).when(fileSender).send();
when(fileSender.getResultFuture()).thenReturn(resultFuture);
doReturn(resultFuture).when(forwarder).configureToCancelIfForwardFutureCompleted(any());
}
@Test
void shouldGetUploadFileStream() {
sendRepresentationFile();
verify(fileService).getUploadedFileStream(IncomingFileTestFactory.ID);
}
@Test
void shouldCreateRepresentationFileSender() {
sendRepresentationFile();
verify(forwarder).createRepresentationFileSender(file, fileContentStream);
}
@Test
void shouldSend() {
sendRepresentationFile();
verify(fileSender).send();
}
@Test
void shouldConfigureFutureToCancelIfForwardFutureCompleted() {
sendRepresentationFile();
verify(forwarder).configureToCancelIfForwardFutureCompleted(resultFuture);
}
@Test
void shouldReturnResultFuture() {
var returned = sendRepresentationFile();
assertThat(returned).isSameAs(resultFuture);
}
private CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile() {
return forwarder.sendRepresentationFile(file);
}
}
@Nested
......@@ -589,6 +695,55 @@ class EingangForwarderTest {
}
}
@Nested
class TestConfigureToCancelIfForwardFutureCompleted {
private final CompletableFuture<Void> forwardFuture = new CompletableFuture<>();
private final CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>();
@BeforeEach
void init() {
setForwardFutureInForwarder(forwardFuture);
}
@Test
void shouldCancelFutureWhenForwardFutureCompleted() {
forwarder.configureToCancelIfForwardFutureCompleted(future);
forwardFuture.complete(null);
assertThat(future.isCancelled()).isTrue();
}
@Test
void shouldCancelFutureWhenForwardFutureWasCancelled() {
forwarder.configureToCancelIfForwardFutureCompleted(future);
forwardFuture.cancel(true);
assertThat(future.isCancelled()).isTrue();
}
@Test
void shouldCancelFutureWhenForwardFutureCompletedExceptionally() {
forwarder.configureToCancelIfForwardFutureCompleted(future);
forwardFuture.completeExceptionally(new RuntimeException("Forced failure"));
assertThat(future.isCancelled()).isTrue();
}
@Test
void shouldNotCancelFutureIfItIsDone() {
forwarder.configureToCancelIfForwardFutureCompleted(future);
future.complete(GrpcRouteForwardingResponse.getDefaultInstance());
forwardFuture.complete(null);
assertThat(future.isCancelled()).isFalse();
}
}
@Nested
class TestBuildGrpcFileContent {
......@@ -675,6 +830,199 @@ class EingangForwarderTest {
}
}
@Nested
class TestForwardingResponseObserver {
@Mock
private CompletableFuture<GrpcRouteForwardingResponse> future;
@Mock
private DelegatingOnReadyHandler onReadyHandler;
@Mock
private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream;
private final GrpcRouteForwardingResponse response = GrpcRouteForwardingResponse.getDefaultInstance();
@InjectMocks
private ForwardingResponseObserver observer;
@Nested
class TestBeforeStart {
@Test
void shouldCreateOnReadyHandler() {
observer.beforeStart(requestStream);
assertThat(getOnReadyHandlerFromObserver()).isNotNull();
}
@Test
void shouldSetOnReadyHandler() {
observer.beforeStart(requestStream);
verify(requestStream).setOnReadyHandler(getOnReadyHandlerFromObserver());
}
}
@Nested
class TestOnNext {
@Test
void shouldSetResponse() {
observer.onNext(response);
assertThat(getResponseFromObserver()).isSameAs(response);
}
}
@Nested
class TestOnError {
private final Throwable error = new RuntimeException("Error when forwarding");
@BeforeEach
void init() {
setOnReadyHandlerInObserver();
}
@Test
void shouldStopOnReadyHandler() {
observer.onError(error);
verify(onReadyHandler).stop();
}
@Test
void shouldCompleteFutureExceptionally() {
observer.onError(error);
verify(future).completeExceptionally(error);
}
}
@Nested
class TestOnCompleted {
@BeforeEach
void init() {
setOnReadyHandlerInObserver();
}
@Test
void shouldStopOnReadyHandler() {
observer.onCompleted();
verify(onReadyHandler).stop();
}
@Test
void shouldCompleteFutureWithResponse() {
observer.onNext(response);
observer.onCompleted();
verify(future).complete(response);
}
}
@Nested
class TestRegisterOnReadyHandler {
@Mock
private Runnable delegate;
@BeforeEach
void init() {
setOnReadyHandlerInObserver();
}
@Test
void shouldSetDelegateInOnReadyHandler() {
observer.registerOnReadyHandler(delegate);
verify(onReadyHandler).setDelegate(delegate);
}
}
private DelegatingOnReadyHandler getOnReadyHandlerFromObserver() {
return ReflectionTestUtils.getField(observer, "onReadyHandler", DelegatingOnReadyHandler.class);
}
private void setOnReadyHandlerInObserver() {
ReflectionTestUtils.setField(observer, "onReadyHandler", onReadyHandler);
}
private GrpcRouteForwardingResponse getResponseFromObserver() {
return ReflectionTestUtils.getField(observer, "response", GrpcRouteForwardingResponse.class);
}
}
@Nested
class TestDelegatingOnReadyHandler {
@Mock
private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream;
@InjectMocks
private DelegatingOnReadyHandler onReadyHandler;
@Test
void shouldDoneBeInitiallyFalse() {
assertThat(getDoneFromOnReadyHandler()).isFalse();
}
@Nested
class TestStop {
@Test
void shouldSetDoneToTrue() {
onReadyHandler.stop();
assertThat(getDoneFromOnReadyHandler()).isTrue();
}
}
@Nested
class TestRun {
@Mock
private Runnable delegate;
@BeforeEach
void init() {
onReadyHandler.setDelegate(delegate);
}
@Test
void shouldNotRunDelegateIfDone() {
onReadyHandler.stop();
lenient().when(requestStream.isReady()).thenReturn(true);
onReadyHandler.run();
verify(delegate, never()).run();
}
@Test
void shouldNotRunDelegateIfNotReady() {
when(requestStream.isReady()).thenReturn(false);
onReadyHandler.run();
verify(delegate, never()).run();
}
@Test
void shouldRunDelegateIfNotDoneAndReady() {
when(requestStream.isReady()).thenReturn(true);
onReadyHandler.run();
verify(delegate).run();
}
}
private boolean getDoneFromOnReadyHandler() {
return ReflectionTestUtils.getField(onReadyHandler, "done", AtomicBoolean.class).get();
}
}
private void setResponseObserverInForwarder(ForwardingResponseObserver responseObserver) {
ReflectionTestUtils.setField(forwarder, "responseObserver", responseObserver);
}
......@@ -686,4 +1034,8 @@ class EingangForwarderTest {
private void setRequestObserverInForwarder(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver) {
ReflectionTestUtils.setField(forwarder, "requestObserver", requestObserver);
}
private void setForwardFutureInForwarder(CompletableFuture<Void> future) {
ReflectionTestUtils.setField(forwarder, "forwardFuture", future);
}
}
......@@ -5,23 +5,29 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Spy;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.EingangTestFactory;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import de.ozgcloud.vorgang.vorgang.VorgangService;
import de.ozgcloud.vorgang.vorgang.VorgangTestFactory;
import lombok.SneakyThrows;
class ForwardingRemoteServiceTest {
......@@ -40,6 +46,61 @@ class ForwardingRemoteServiceTest {
@Spy
private ForwardingRemoteService service;
@Nested
class TestForward {
private final ForwardingRequest request = ForwardingRequestTestFactory.create();
private final GrpcRouteForwarding grpcRouteForwarding = GrpcRouteForwardingTestFactory.create();
private final CompletableFuture<Void> responseFuture = new CompletableFuture<>();
private MockedStatic<EingangForwarder> eingangForwarderMockedStatic;
@Mock
private EingangForwarder eingangForwarder;
@BeforeEach
void init() {
when(vorgangService.getById(any())).thenReturn(VorgangTestFactory.create());
eingangForwarderMockedStatic = mockStatic(EingangForwarder.class);
eingangForwarderMockedStatic.when(() -> EingangForwarder.create(any(), any(), any())).thenReturn(eingangForwarder);
when(eingangForwarder.forward(any(), any(), any())).thenReturn(eingangForwarder);
when(eingangForwarder.getForwardFuture()).thenReturn(responseFuture);
when(forwardingRequestMapper.toGrpcRouteForwarding(any(), any())).thenReturn(grpcRouteForwarding);
doNothing().when(service).waitForCompletion(any());
}
@AfterEach
void teardown() {
eingangForwarderMockedStatic.close();
}
@Test
void shouldGetVorgang() {
service.forward(request);
verify(vorgangService).getById(VorgangTestFactory.ID);
}
@Test
void shouldMapToGrpcRouteForwarding() {
service.forward(request);
verify(forwardingRequestMapper).toGrpcRouteForwarding(request, VorgangTestFactory.EINGANG);
}
@Test
void shouldForward() {
service.forward(request);
verify(eingangForwarder).forward(grpcRouteForwarding, List.of(EingangTestFactory.ATTACHMENT), List.of(EingangTestFactory.REPRESENTATION));
}
@Test
void shouldWaitForCompletion() {
service.forward(request);
verify(service).waitForCompletion(responseFuture);
}
}
@Nested
class TestWaitForCompletion {
......@@ -67,7 +128,7 @@ class ForwardingRemoteServiceTest {
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion);
assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion);
}
@Test
......@@ -80,6 +141,17 @@ class ForwardingRemoteServiceTest {
assertThat(Thread.currentThread().isInterrupted()).isTrue();
}
@Test
void shouldCompleteFutureExceptionally() {
try {
waitForCompletion();
} catch (TechnicalException e) {
// expected
}
verify(future).completeExceptionally(exception);
}
}
@Nested
......@@ -95,7 +167,18 @@ class ForwardingRemoteServiceTest {
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion);
assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion);
}
@Test
void shouldCompleteFutureExceptionally() {
try {
waitForCompletion();
} catch (TechnicalException e) {
// expected
}
verify(future).completeExceptionally(exception);
}
}
......@@ -112,7 +195,18 @@ class ForwardingRemoteServiceTest {
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion);
assertThrows(TechnicalException.class, TestWaitForCompletion.this::waitForCompletion);
}
@Test
void shouldCompleteFutureExceptionally() {
try {
waitForCompletion();
} catch (TechnicalException e) {
// expected
}
verify(future).completeExceptionally(exception);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment