Skip to content
Snippets Groups Projects
Commit d6a5e58a authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 Stop transfers in case of error

parent 692dc220
No related branches found
No related tags found
1 merge request!27OZG-7573 Dateien Weiterleiten
......@@ -26,6 +26,7 @@ import de.ozgcloud.vorgang.vorgang.IncomingFileGroup;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
......@@ -39,9 +40,13 @@ class EingangForwarder {
private ForwardingResponseObserver responseObserver;
private ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver;
public CompletableFuture<Void> forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments,
@Getter
private CompletableFuture<Void> forwardFuture;
public EingangForwarder forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments,
List<IncomingFile> representations) {
return CompletableFuture.allOf(
forwardFuture = CompletableFuture.allOf(
callService(),
sendRouteForwarding(grpcRouteForwarding)
.thenCompose(ignored -> sendAttachments(attachments))
......@@ -54,6 +59,7 @@ class EingangForwarder {
}
})
);
return this;
}
CompletableFuture<GrpcRouteForwardingResponse> callService() {
......@@ -67,11 +73,15 @@ class EingangForwarder {
CompletableFuture<GrpcRouteForwardingResponse> sendRouteForwarding(GrpcRouteForwarding grpcRouteForwarding) {
CompletableFuture<GrpcRouteForwardingResponse> future = new CompletableFuture<>();
responseObserver.registerOnReadyHandler(() -> {
responseObserver.registerOnReadyHandler(getSendRouteForwardingRunnable(grpcRouteForwarding, future));
return future;
}
Runnable getSendRouteForwardingRunnable(GrpcRouteForwarding grpcRouteForwarding, CompletableFuture<GrpcRouteForwardingResponse> future) {
return () -> {
requestObserver.onNext(GrpcRouteForwardingRequest.newBuilder().setRouteForwarding(grpcRouteForwarding).build());
future.complete(GrpcRouteForwardingResponse.newBuilder().build());
});
return future;
};
}
CompletableFuture<GrpcRouteForwardingResponse> sendAttachments(List<IncomingFileGroup> attachments) {
......@@ -92,10 +102,12 @@ class EingangForwarder {
return ignored -> sendAttachmentFile(groupName, file);
}
private CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile(String groupName, IncomingFile file) {
CompletableFuture<GrpcRouteForwardingResponse> sendAttachmentFile(String groupName, IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createAttachmentFileSender(groupName, file, fileContentStream).send();
return sender.getResultFuture();
var future = sender.getResultFuture();
configureToCancelIfForwardFutureCompleted(future);
return future;
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(String groupName,
......@@ -130,14 +142,17 @@ class EingangForwarder {
);
}
private Function<GrpcRouteForwardingResponse, CompletableFuture<GrpcRouteForwardingResponse>> getSendRepresentationFileFunction(IncomingFile file) {
private Function<GrpcRouteForwardingResponse, CompletableFuture<GrpcRouteForwardingResponse>> getSendRepresentationFileFunction(
IncomingFile file) {
return ignored -> sendRepresentationFile(file);
}
private CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile(IncomingFile file) {
CompletableFuture<GrpcRouteForwardingResponse> sendRepresentationFile(IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId());
var sender = createRepresentationFileSender(file, fileContentStream).send();
return sender.getResultFuture();
var future = sender.getResultFuture();
configureToCancelIfForwardFutureCompleted(future);
return future;
}
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createRepresentationFileSender(IncomingFile file,
......@@ -161,6 +176,14 @@ class EingangForwarder {
.build();
}
void configureToCancelIfForwardFutureCompleted(CompletableFuture<GrpcRouteForwardingResponse> future) {
forwardFuture.whenComplete((result, ex) -> {
if (forwardFuture.isDone() && !future.isDone()) {
future.cancel(true);
}
});
}
GrpcFileContent buildGrpcFileContent(byte[] chunk, int length) {
var fileContentBuilder = GrpcFileContent.newBuilder();
if (length <= 0) {
......@@ -173,7 +196,8 @@ class EingangForwarder {
StreamingFileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createSenderWithoutMetadata(
BiFunction<byte[], Integer, GrpcRouteForwardingRequest> chunkBuilder, InputStream fileContentStream) {
return GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, fileContentStream, requestObserver, responseObserver::registerOnReadyHandler);
return GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, fileContentStream, requestObserver,
responseObserver::registerOnReadyHandler);
}
@RequiredArgsConstructor
......
......@@ -42,7 +42,7 @@ import net.devh.boot.grpc.client.inject.GrpcClient;
@RequiredArgsConstructor
class ForwardingRemoteService {
private static final int TIMEOUT_MINUTES = 10;
static final int TIMEOUT_MINUTES = 10;
private final VorgangService vorgangService;
private final ForwardingRequestMapper forwardingRequestMapper;
@GrpcClient("forwarder")
......@@ -53,7 +53,8 @@ class ForwardingRemoteService {
public void forward(ForwardingRequest request) {
var eingang = vorgangService.getById(request.getVorgangId()).getEingangs().getFirst();
var grpcRouteForwarding = forwardingRequestMapper.toGrpcRouteForwarding(request, eingang);
var responseFuture = new EingangForwarder(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(), eingang.getRepresentations());
var responseFuture = new EingangForwarder(serviceStub, fileService, incomingFileMapper).forward(grpcRouteForwarding, eingang.getAttachments(),
eingang.getRepresentations()).getForwardFuture();
waitForCompletion(responseFuture);
}
......
......@@ -23,15 +23,23 @@
*/
package de.ozgcloud.vorgang.vorgang;
import de.ozgcloud.vorgang.files.FileId;
public class IncomingFileGroupTestFactory {
public static final String NAME = GrpcIncomingFileGroupTestFactory.NAME;
public static final IncomingFile FILE = IncomingFileTestFactory.create();
public static final IncomingFile FILE2 = IncomingFileTestFactory.createBuilder()
.id(FileId.createNew()).build();
public static IncomingFileGroup create() {
return createBuilder().build();
}
public static IncomingFileGroup createWithTwoFiles() {
return createBuilder().file(FILE2).build();
}
public static IncomingFileGroup.IncomingFileGroupBuilder createBuilder() {
return IncomingFileGroup.builder()
.name(NAME)
......
package de.ozgcloud.vorgang.vorgang.redirect;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc;
import de.ozgcloud.vorgang.files.FileService;
import de.ozgcloud.vorgang.vorgang.IncomingFileMapper;
import de.ozgcloud.vorgang.vorgang.VorgangService;
import lombok.SneakyThrows;
class ForwardingRemoteServiceTest {
@Mock
private VorgangService vorgangService;
@Mock
private ForwardingRequestMapper forwardingRequestMapper;
@Mock
private RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub;
@Mock
private FileService fileService;
@Mock
private IncomingFileMapper incomingFileMapper;
@InjectMocks
@Spy
private ForwardingRemoteService service;
@Nested
class TestWaitForCompletion {
@Mock
private CompletableFuture<Void> future;
@SneakyThrows
@Test
void shouldGetFromFuture() {
waitForCompletion();
verify(future).get(ForwardingRemoteService.TIMEOUT_MINUTES, TimeUnit.MINUTES);
}
@Nested
class TestOnInterruptedException {
private final InterruptedException exception = new InterruptedException();
@BeforeEach
@SneakyThrows
void mock() {
when(future.get(anyLong(), any())).thenThrow(exception);
}
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion);
}
@Test
void shouldInterruptThread() {
try {
waitForCompletion();
} catch (TechnicalException e) {
// expected
}
assertThat(Thread.currentThread().isInterrupted()).isTrue();
}
}
@Nested
class TestOnExecutionException {
private final ExecutionException exception = new ExecutionException(new Exception());
@BeforeEach
@SneakyThrows
void mock() {
when(future.get(anyLong(), any())).thenThrow(exception);
}
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion);
}
}
@Nested
class TestOnTimeoutException {
private final TimeoutException exception = new TimeoutException();
@BeforeEach
@SneakyThrows
void mock() {
when(future.get(anyLong(), any())).thenThrow(exception);
}
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, de.ozgcloud.vorgang.vorgang.redirect.ForwardingRemoteServiceTest.TestWaitForCompletion.this::waitForCompletion);
}
}
private void waitForCompletion() {
service.waitForCompletion(future);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment