Skip to content
Snippets Groups Projects
Commit 9ff7fb3a authored by Felix Reichenbach's avatar Felix Reichenbach
Browse files

OZG-7573 remove waitForFinishedFileUpload

parent bfa38d2f
Branches
Tags
1 merge request!23Ozg 7573 forwarding interface
This commit is part of merge request !23. Comments created here will be created in the context of that merge request.
...@@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException; ...@@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
...@@ -109,8 +108,7 @@ class ForwardingRemoteService { ...@@ -109,8 +108,7 @@ class ForwardingRemoteService {
private void sendAttachmentFile(StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) { private void sendAttachmentFile(StreamObserver<GrpcRouteForwardingRequest> requestStreamObserver, String groupName, IncomingFile file) {
var fileContentStream = fileService.getUploadedFileStream(file.getId()); var fileContentStream = fileService.getUploadedFileStream(file.getId());
var fileSender = createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send(); createAttachmentFileSender(requestStreamObserver, groupName, file, fileContentStream).send();
waitForFinishedFileUpload(fileSender, fileContentStream);
} }
FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender( FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> createAttachmentFileSender(
...@@ -138,8 +136,7 @@ class ForwardingRemoteService { ...@@ -138,8 +136,7 @@ class ForwardingRemoteService {
void sendRepresentations(List<IncomingFile> representations, StreamObserver<GrpcRouteForwardingRequest> requestObserver) { void sendRepresentations(List<IncomingFile> representations, StreamObserver<GrpcRouteForwardingRequest> requestObserver) {
representations.forEach(representation -> { representations.forEach(representation -> {
var fileContentStream = fileService.getUploadedFileStream(representation.getId()); var fileContentStream = fileService.getUploadedFileStream(representation.getId());
var fileSender = createRepresentationFileSender(requestObserver, representation, fileContentStream).send(); createRepresentationFileSender(requestObserver, representation, fileContentStream).send();
waitForFinishedFileUpload(fileSender, fileContentStream);
}); });
} }
...@@ -187,29 +184,16 @@ class ForwardingRemoteService { ...@@ -187,29 +184,16 @@ class ForwardingRemoteService {
.build(); .build();
} }
void waitForFinishedFileUpload(FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender, InputStream inputStream) { void waitForCompletion(CompletableFuture<Void> responseFuture) {
try { try {
fileSender.getResultFuture().get(TIMEOUT_MINUTES, TimeUnit.MINUTES); responseFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
fileSender.cancelOnError(e);
throw new TechnicalException("Waiting for finishing file upload was interrupted.", e); throw new TechnicalException("Waiting for finishing file upload was interrupted.", e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
fileSender.cancelOnError(e);
throw new TechnicalException("Error on uploading file content.", e); throw new TechnicalException("Error on uploading file content.", e);
} catch (TimeoutException e) { } catch (TimeoutException e) {
fileSender.cancelOnTimeout();
throw new TechnicalException("Timeout on uploading file content.", e); throw new TechnicalException("Timeout on uploading file content.", e);
} finally {
IOUtils.closeQuietly(inputStream);
}
}
void waitForCompletion(CompletableFuture<Void> responseFuture) {
try {
responseFuture.get();
} catch (Throwable t) {
throw new TechnicalException("Forwarding failed", t);
} }
} }
......
...@@ -320,7 +320,6 @@ class ForwardingRemoteServiceTest { ...@@ -320,7 +320,6 @@ class ForwardingRemoteServiceTest {
when(fileService.getUploadedFileStream(any())).thenReturn(inputStream); when(fileService.getUploadedFileStream(any())).thenReturn(inputStream);
doReturn(fileSender).when(service).createAttachmentFileSender(any(), any(), any(), any()); doReturn(fileSender).when(service).createAttachmentFileSender(any(), any(), any(), any());
when(fileSender.send()).thenReturn(fileSender); when(fileSender.send()).thenReturn(fileSender);
doNothing().when(service).waitForFinishedFileUpload(any(), any());
} }
@Test @Test
...@@ -345,16 +344,6 @@ class ForwardingRemoteServiceTest { ...@@ -345,16 +344,6 @@ class ForwardingRemoteServiceTest {
verify(fileSender).send(); verify(fileSender).send();
} }
@Test
void shouldCallWaitForFinishedFileUploadAfterSend() {
var inOrder = inOrder(fileSender, service);
sendAttachments();
inOrder.verify(fileSender).send();
inOrder.verify(service).waitForFinishedFileUpload(fileSender, inputStream);
}
private void sendAttachments() { private void sendAttachments() {
service.sendAttachments(List.of(attachment), requestObserver); service.sendAttachments(List.of(attachment), requestObserver);
} }
...@@ -485,7 +474,6 @@ class ForwardingRemoteServiceTest { ...@@ -485,7 +474,6 @@ class ForwardingRemoteServiceTest {
when(fileService.getUploadedFileStream(any())).thenReturn(inputStream); when(fileService.getUploadedFileStream(any())).thenReturn(inputStream);
doReturn(fileSender).when(service).createRepresentationFileSender(any(), any(), any()); doReturn(fileSender).when(service).createRepresentationFileSender(any(), any(), any());
when(fileSender.send()).thenReturn(fileSender); when(fileSender.send()).thenReturn(fileSender);
doNothing().when(service).waitForFinishedFileUpload(any(), any());
} }
@Test @Test
...@@ -509,16 +497,6 @@ class ForwardingRemoteServiceTest { ...@@ -509,16 +497,6 @@ class ForwardingRemoteServiceTest {
verify(fileSender).send(); verify(fileSender).send();
} }
@Test
void shouldCallWaitForFinishedFileUploadAfterSend() {
var inOrder = inOrder(fileSender, service);
sendRepresentations();
inOrder.verify(fileSender).send();
inOrder.verify(service).waitForFinishedFileUpload(fileSender, inputStream);
}
private void sendRepresentations() { private void sendRepresentations() {
service.sendRepresentations(List.of(representation), requestObserver); service.sendRepresentations(List.of(representation), requestObserver);
} }
...@@ -710,45 +688,17 @@ class ForwardingRemoteServiceTest { ...@@ -710,45 +688,17 @@ class ForwardingRemoteServiceTest {
} }
@Nested @Nested
class TestWaitForFinishedFileUpload { class TestWaitForCompletion {
@Mock @Mock
private FileSender<GrpcRouteForwardingRequest, GrpcRouteForwardingResponse> fileSender; private CompletableFuture<Void> future;
@Mock
private InputStream inputStream;
@Mock
private CompletableFuture<GrpcRouteForwardingResponse> resultFuture;
@BeforeEach
void mock() {
when(fileSender.getResultFuture()).thenReturn(resultFuture);
}
@Test
void shouldGetResultFuture() {
waitForFinishedFileUpload();
verify(fileSender).getResultFuture();
}
@Test
@SneakyThrows @SneakyThrows
void shouldGetResultFromFuture() {
waitForFinishedFileUpload();
verify(resultFuture).get(2, TimeUnit.MINUTES);
}
@Test @Test
@SneakyThrows void shouldGetFromFuture() {
void shouldCloseInputStream() { waitForCompletion();
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(inputStream).close(); verify(future).get(2, TimeUnit.MINUTES);
} }
@Nested @Nested
...@@ -759,47 +709,24 @@ class ForwardingRemoteServiceTest { ...@@ -759,47 +709,24 @@ class ForwardingRemoteServiceTest {
@BeforeEach @BeforeEach
@SneakyThrows @SneakyThrows
void mock() { void mock() {
when(resultFuture.get(anyLong(), any())).thenThrow(exception); when(future.get(anyLong(), any())).thenThrow(exception);
} }
@Test @Test
void shouldThrowTechnicalException() { void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, () -> waitForFinishedFileUpload()); assertThrows(TechnicalException.class, () -> waitForCompletion());
} }
@Test @Test
void shouldInterruptThread() { void shouldInterruptThread() {
try { try {
waitForFinishedFileUpload(); waitForCompletion();
} catch (TechnicalException e) { } catch (TechnicalException e) {
// expected // expected
} }
assertThat(Thread.currentThread().isInterrupted()).isTrue(); assertThat(Thread.currentThread().isInterrupted()).isTrue();
} }
@Test
void shouldCancelOnError() {
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(fileSender).cancelOnError(exception);
}
@Test
@SneakyThrows
void shouldCloseInputStream() {
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(inputStream).close();
}
} }
@Nested @Nested
...@@ -810,35 +737,12 @@ class ForwardingRemoteServiceTest { ...@@ -810,35 +737,12 @@ class ForwardingRemoteServiceTest {
@BeforeEach @BeforeEach
@SneakyThrows @SneakyThrows
void mock() { void mock() {
when(resultFuture.get(anyLong(), any())).thenThrow(exception); when(future.get(anyLong(), any())).thenThrow(exception);
} }
@Test @Test
void shouldThrowTechnicalException() { void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, () -> waitForFinishedFileUpload()); assertThrows(TechnicalException.class, () -> waitForCompletion());
}
@Test
void shouldCancelOnError() {
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(fileSender).cancelOnError(exception);
}
@Test
@SneakyThrows
void shouldCloseInputStream() {
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(inputStream).close();
} }
} }
...@@ -850,64 +754,13 @@ class ForwardingRemoteServiceTest { ...@@ -850,64 +754,13 @@ class ForwardingRemoteServiceTest {
@BeforeEach @BeforeEach
@SneakyThrows @SneakyThrows
void mock() { void mock() {
when(resultFuture.get(anyLong(), any())).thenThrow(exception); when(future.get(anyLong(), any())).thenThrow(exception);
} }
@Test @Test
void shouldThrowTechnicalException() { void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class, () -> waitForFinishedFileUpload()); assertThrows(TechnicalException.class, () -> waitForCompletion());
}
@Test
void shouldCancelOnTimeout() {
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(fileSender).cancelOnTimeout();
} }
@Test
@SneakyThrows
void shouldCloseInputStream() {
try {
waitForFinishedFileUpload();
} catch (TechnicalException e) {
// expected
}
verify(inputStream).close();
}
}
private void waitForFinishedFileUpload() {
service.waitForFinishedFileUpload(fileSender, inputStream);
}
}
@Nested
class TestWaitForCompletion {
@Mock
private CompletableFuture<Void> future;
@SneakyThrows
@Test
void shouldGetFromFuture() {
waitForCompletion();
verify(future).get();
}
@SneakyThrows
@Test
void shouldThrowTechnicalException() {
var exception = new RuntimeException();
when(future.get()).thenThrow(exception);
assertThatThrownBy(this::waitForCompletion).isInstanceOf(TechnicalException.class).hasCause(exception);
} }
private void waitForCompletion() { private void waitForCompletion() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment