Skip to content
Snippets Groups Projects
Commit dbc3503c authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 Notify requestObserver about error in StreamingFileSender

parent 7c8cf453
Branches
Tags
1 merge request!13OZG-7573 OZG-7991 Notify requestObserver about error in StreamingFileSender
......@@ -27,7 +27,6 @@ import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Function;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.log4j.Log4j2;
......@@ -63,16 +62,4 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
protected CallStreamObserver<Q> getRequestObserver() {
return requestObserver;
}
@Override
public void cancelOnTimeout() {
super.cancelOnTimeout();
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
}
@Override
public void cancelOnError(Throwable t) {
super.cancelOnError(t);
requestObserver.onError(t);
}
}
......@@ -75,12 +75,14 @@ public abstract class StreamingFileSender<Q, S> {
public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout");
resultFuture.cancel(true);
getRequestObserver().onError(new TechnicalException("Timeout on waiting for upload."));
closeStreamReader();
}
public void cancelOnError(Throwable t) {
LOG.error("File transfer canceled on error.", t);
resultFuture.cancel(true);
getRequestObserver().onError(t);
closeStreamReader();
}
......
......@@ -43,7 +43,6 @@ import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
......@@ -119,77 +118,4 @@ class StreamExclusiveFileSenderTest {
verify(requestObserver).onCompleted();
}
}
@Nested
class TestCancelOnTimeout {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnTimeout();
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnTimeout();
verify(requestObserver).onError(any(TechnicalException.class));
}
@Test
void shouldCloseStreamReader() {
fileSender.cancelOnTimeout();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCancelOnError {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
private final Throwable error = new Throwable();
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnError(error);
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnError(error);
verify(requestObserver).onError(error);
}
@Test
void shouldCloseStreamReader() {
fileSender.cancelOnError(error);
verify(fileSender).closeStreamReader();
}
}
}
......@@ -43,6 +43,7 @@ import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
......@@ -82,6 +83,13 @@ class StreamingFileSenderTest {
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnTimeout();
verify(requestObserver).onError(any(TechnicalException.class));
}
@Test
void shouldCloseStreams() {
fileSender.cancelOnTimeout();
......@@ -109,6 +117,13 @@ class StreamingFileSenderTest {
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnError(error);
verify(requestObserver).onError(error);
}
@Test
void shouldCloseStreams() {
fileSender.cancelOnError(error);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment