Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ozg-cloud/lib/common-lib
1 result
Show changes
Commits on Source (2)
......@@ -64,11 +64,13 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
return requestObserver;
}
@Override
public void cancelOnTimeout() {
super.cancelOnTimeout();
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
}
@Override
public void cancelOnError(Throwable t) {
super.cancelOnError(t);
requestObserver.onError(t);
......
......@@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> {
}
public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t);
LOG.error("File transfer canceled on error.", t);
resultFuture.cancel(true);
closeStreamReader();
}
void sendNext() {
if (!done.get()) {
if (notFinished()) {
waitForObserver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (!done.get() && isReady());
} while (notFinished() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
checkIfFinishedForcefully();
}
private boolean notFinished() {
return !done.get() && !resultFuture.isCancelled();
}
private void checkIfFinishedForcefully() {
if (resultFuture.isCancelled()) {
LOG.warn("File transfer was cancelled");
closeStreamReader();
}
}
private boolean isReady() {
......
......@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.commons.lang3.RandomUtils;
......@@ -71,7 +72,7 @@ class StreamingFileSenderTest {
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
setResultFutureInFileSender(resultFuture);
}
@Test
......@@ -98,7 +99,7 @@ class StreamingFileSenderTest {
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
setResultFutureInFileSender(resultFuture);
}
@Test
......@@ -119,6 +120,9 @@ class StreamingFileSenderTest {
@Nested
class TestSendNext {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach
void init() {
fileSender.send();
......@@ -137,6 +141,59 @@ class StreamingFileSenderTest {
verify(fileSender).sendNextChunk();
}
@Test
void shouldNotSendMetaDataIfDone() {
setDoneInFileSender(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldNotSendMetaDataIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldSendNextChunkUntilDone() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
setDoneInFileSender(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void shouldSendNextChunkUntilCancelled() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
fileSender.getResultFuture().cancel(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void closeStreamReaderIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender).closeStreamReader();
}
}
@Nested
......@@ -289,6 +346,14 @@ class StreamingFileSenderTest {
}
}
private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
}
private void setDoneInFileSender(boolean done) {
((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
}
static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
@Getter(AccessLevel.PROTECTED)
......