Skip to content
Snippets Groups Projects
Commit 8e5eefbd authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 Stop if cancelled

parent 8f8039e7
No related branches found
No related tags found
1 merge request!12Ozg 7573 files weiterleitung bug
...@@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> { ...@@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> {
} }
public void cancelOnError(Throwable t) { public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t); LOG.error("File transfer canceled on error.", t);
resultFuture.cancel(true); resultFuture.cancel(true);
closeStreamReader(); closeStreamReader();
} }
void sendNext() { void sendNext() {
if (!done.get()) { if (notFinished()) {
waitForObserver(); waitForObserver();
sendMetaData(); sendMetaData();
do { do {
LOG.debug("Sending next chunk."); LOG.debug("Sending next chunk.");
sendNextChunk(); sendNextChunk();
} while (!done.get() && isReady()); } while (notFinished() && isReady());
LOG.debug("Finished or waiting to become ready."); LOG.debug("Finished or waiting to become ready.");
} }
checkIfFinishedForcefully();
}
private boolean notFinished() {
return !done.get() && !resultFuture.isCancelled();
}
private void checkIfFinishedForcefully() {
if (resultFuture.isCancelled()) {
LOG.warn("File transfer was cancelled");
closeStreamReader();
}
} }
private boolean isReady() { private boolean isReady() {
......
...@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*; ...@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
...@@ -71,7 +72,7 @@ class StreamingFileSenderTest { ...@@ -71,7 +72,7 @@ class StreamingFileSenderTest {
@BeforeEach @BeforeEach
void init() { void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); setResultFutureInFileSender(resultFuture);
} }
@Test @Test
...@@ -98,7 +99,7 @@ class StreamingFileSenderTest { ...@@ -98,7 +99,7 @@ class StreamingFileSenderTest {
@BeforeEach @BeforeEach
void init() { void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); setResultFutureInFileSender(resultFuture);
} }
@Test @Test
...@@ -119,6 +120,9 @@ class StreamingFileSenderTest { ...@@ -119,6 +120,9 @@ class StreamingFileSenderTest {
@Nested @Nested
class TestSendNext { class TestSendNext {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach @BeforeEach
void init() { void init() {
fileSender.send(); fileSender.send();
...@@ -137,6 +141,59 @@ class StreamingFileSenderTest { ...@@ -137,6 +141,59 @@ class StreamingFileSenderTest {
verify(fileSender).sendNextChunk(); verify(fileSender).sendNextChunk();
} }
@Test
void shouldNotSendMetaDataIfDone() {
setDoneInFileSender(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldNotSendMetaDataIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldSendNextChunkUntilDone() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
setDoneInFileSender(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void shouldSendNextChunkUntilCancelled() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
fileSender.getResultFuture().cancel(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void closeStreamReaderIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender).closeStreamReader();
}
} }
@Nested @Nested
...@@ -289,6 +346,14 @@ class StreamingFileSenderTest { ...@@ -289,6 +346,14 @@ class StreamingFileSenderTest {
} }
} }
private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
}
private void setDoneInFileSender(boolean done) {
((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
}
static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
@Getter(AccessLevel.PROTECTED) @Getter(AccessLevel.PROTECTED)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment