Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • ozg-cloud/lib/common-lib
1 result
Select Git revision
Show changes
Commits on Source (2)
...@@ -64,11 +64,13 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> { ...@@ -64,11 +64,13 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
return requestObserver; return requestObserver;
} }
@Override
public void cancelOnTimeout() { public void cancelOnTimeout() {
super.cancelOnTimeout(); super.cancelOnTimeout();
requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
} }
@Override
public void cancelOnError(Throwable t) { public void cancelOnError(Throwable t) {
super.cancelOnError(t); super.cancelOnError(t);
requestObserver.onError(t); requestObserver.onError(t);
......
...@@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> { ...@@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> {
} }
public void cancelOnError(Throwable t) { public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t); LOG.error("File transfer canceled on error.", t);
resultFuture.cancel(true); resultFuture.cancel(true);
closeStreamReader(); closeStreamReader();
} }
void sendNext() { void sendNext() {
if (!done.get()) { if (notFinished()) {
waitForObserver(); waitForObserver();
sendMetaData(); sendMetaData();
do { do {
LOG.debug("Sending next chunk."); LOG.debug("Sending next chunk.");
sendNextChunk(); sendNextChunk();
} while (!done.get() && isReady()); } while (notFinished() && isReady());
LOG.debug("Finished or waiting to become ready."); LOG.debug("Finished or waiting to become ready.");
} }
checkIfFinishedForcefully();
}
private boolean notFinished() {
return !done.get() && !resultFuture.isCancelled();
}
private void checkIfFinishedForcefully() {
if (resultFuture.isCancelled()) {
LOG.warn("File transfer was cancelled");
closeStreamReader();
}
} }
private boolean isReady() { private boolean isReady() {
......
...@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*; ...@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
...@@ -71,7 +72,7 @@ class StreamingFileSenderTest { ...@@ -71,7 +72,7 @@ class StreamingFileSenderTest {
@BeforeEach @BeforeEach
void init() { void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); setResultFutureInFileSender(resultFuture);
} }
@Test @Test
...@@ -98,7 +99,7 @@ class StreamingFileSenderTest { ...@@ -98,7 +99,7 @@ class StreamingFileSenderTest {
@BeforeEach @BeforeEach
void init() { void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); setResultFutureInFileSender(resultFuture);
} }
@Test @Test
...@@ -119,6 +120,9 @@ class StreamingFileSenderTest { ...@@ -119,6 +120,9 @@ class StreamingFileSenderTest {
@Nested @Nested
class TestSendNext { class TestSendNext {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach @BeforeEach
void init() { void init() {
fileSender.send(); fileSender.send();
...@@ -137,6 +141,59 @@ class StreamingFileSenderTest { ...@@ -137,6 +141,59 @@ class StreamingFileSenderTest {
verify(fileSender).sendNextChunk(); verify(fileSender).sendNextChunk();
} }
@Test
void shouldNotSendMetaDataIfDone() {
setDoneInFileSender(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldNotSendMetaDataIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldSendNextChunkUntilDone() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
setDoneInFileSender(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void shouldSendNextChunkUntilCancelled() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
fileSender.getResultFuture().cancel(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void closeStreamReaderIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender).closeStreamReader();
}
} }
@Nested @Nested
...@@ -289,6 +346,14 @@ class StreamingFileSenderTest { ...@@ -289,6 +346,14 @@ class StreamingFileSenderTest {
} }
} }
private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
}
private void setDoneInFileSender(boolean done) {
((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
}
static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
@Getter(AccessLevel.PROTECTED) @Getter(AccessLevel.PROTECTED)
......