From 8e5eefbde6274091c01e2c16ce2e48e2e8b716bc Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 15:13:21 +0200 Subject: [PATCH] OZG-7573 OZG-7991 Stop if cancelled --- .../binaryfile/StreamingFileSender.java | 18 ++++- .../binaryfile/StreamingFileSenderTest.java | 69 ++++++++++++++++++- 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index 89661ba..de78cf1 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> { } public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); + LOG.error("File transfer canceled on error.", t); resultFuture.cancel(true); closeStreamReader(); } void sendNext() { - if (!done.get()) { + if (notFinished()) { waitForObserver(); sendMetaData(); do { LOG.debug("Sending next chunk."); sendNextChunk(); - } while (!done.get() && isReady()); + } while (notFinished() && isReady()); LOG.debug("Finished or waiting to become ready."); } + checkIfFinishedForcefully(); + } + + private boolean notFinished() { + return !done.get() && !resultFuture.isCancelled(); + } + + private void checkIfFinishedForcefully() { + if (resultFuture.isCancelled()) { + LOG.warn("File transfer was cancelled"); + closeStreamReader(); + } } private boolean isReady() { diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index ba58775..ce12c67 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import org.apache.commons.lang3.RandomUtils; @@ -71,7 +72,7 @@ class StreamingFileSenderTest { @BeforeEach void init() { - ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + setResultFutureInFileSender(resultFuture); } @Test @@ -98,7 +99,7 @@ class StreamingFileSenderTest { @BeforeEach void init() { - ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + setResultFutureInFileSender(resultFuture); } @Test @@ -119,6 +120,9 @@ class StreamingFileSenderTest { @Nested class TestSendNext { + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @BeforeEach void init() { fileSender.send(); @@ -137,6 +141,59 @@ class StreamingFileSenderTest { verify(fileSender).sendNextChunk(); } + + @Test + void shouldNotSendMetaDataIfDone() { + setDoneInFileSender(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldNotSendMetaDataIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldSendNextChunkUntilDone() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + setDoneInFileSender(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void shouldSendNextChunkUntilCancelled() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + fileSender.getResultFuture().cancel(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void closeStreamReaderIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender).closeStreamReader(); + } } @Nested @@ -289,6 +346,14 @@ class StreamingFileSenderTest { } } + private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + } + + private void setDoneInFileSender(boolean done) { + ((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done); + } + static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { @Getter(AccessLevel.PROTECTED) -- GitLab