diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index 89661ba2ca0a58bef7e71f0387129b7221219cf5..de78cf14b005300683dd1d63ed5c7cf4184da010 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> { } public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); + LOG.error("File transfer canceled on error.", t); resultFuture.cancel(true); closeStreamReader(); } void sendNext() { - if (!done.get()) { + if (notFinished()) { waitForObserver(); sendMetaData(); do { LOG.debug("Sending next chunk."); sendNextChunk(); - } while (!done.get() && isReady()); + } while (notFinished() && isReady()); LOG.debug("Finished or waiting to become ready."); } + checkIfFinishedForcefully(); + } + + private boolean notFinished() { + return !done.get() && !resultFuture.isCancelled(); + } + + private void checkIfFinishedForcefully() { + if (resultFuture.isCancelled()) { + LOG.warn("File transfer was cancelled"); + closeStreamReader(); + } } private boolean isReady() { diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index ba58775d9a8a0316557d7ea7b76b3089a94d1668..ce12c67d1ee3d157c1385d00f5bd5193382ac9ae 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import org.apache.commons.lang3.RandomUtils; @@ -71,7 +72,7 @@ class StreamingFileSenderTest { @BeforeEach void init() { - ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + setResultFutureInFileSender(resultFuture); } @Test @@ -98,7 +99,7 @@ class StreamingFileSenderTest { @BeforeEach void init() { - ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + setResultFutureInFileSender(resultFuture); } @Test @@ -119,6 +120,9 @@ class StreamingFileSenderTest { @Nested class TestSendNext { + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @BeforeEach void init() { fileSender.send(); @@ -137,6 +141,59 @@ class StreamingFileSenderTest { verify(fileSender).sendNextChunk(); } + + @Test + void shouldNotSendMetaDataIfDone() { + setDoneInFileSender(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldNotSendMetaDataIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldSendNextChunkUntilDone() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + setDoneInFileSender(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void shouldSendNextChunkUntilCancelled() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + fileSender.getResultFuture().cancel(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void closeStreamReaderIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender).closeStreamReader(); + } } @Nested @@ -289,6 +346,14 @@ class StreamingFileSenderTest { } } + private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + } + + private void setDoneInFileSender(boolean done) { + ((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done); + } + static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { @Getter(AccessLevel.PROTECTED)