Skip to content
Snippets Groups Projects
Commit a6b106c9 authored by OZGCloud's avatar OZGCloud
Browse files

allow cancelation grpc file transfer on error/timeout

parent 17c59106
Branches
Tags
No related merge requests found
......@@ -30,7 +30,9 @@ import io.grpc.stub.ClientResponseObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseObserver<ReqT, R> {
......@@ -61,6 +63,7 @@ public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseOb
@Override
public void onCompleted() {
LOG.debug("Complete future...");
future.complete(response);
}
......
......@@ -62,6 +62,7 @@ public class GrpcFileUploadUtils {
private final BiFunction<byte[], Integer, Q> chunkBuilder;
private final InputStream inputStream;
@Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
private CallStreamObserver<Q> requestObserver;
......@@ -86,12 +87,22 @@ public class GrpcFileUploadUtils {
return this;
}
public CompletableFuture<S> send() {
public FileSender<Q, S> send() {
LOG.debug("Start sending File.");
var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
return resultFuture;
return this;
}
public void cancelOnTimeout() {
resultFuture.cancel(true);
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
}
public void cancelOnError(Throwable t) {
resultFuture.cancel(true);
requestObserver.onError(t);
}
void sendNext() {
......@@ -100,9 +111,7 @@ public class GrpcFileUploadUtils {
sendMetaData();
do {
LOG.debug("Sending next chunk.");
// long sentSize =
sendNextChunk();
// checkForEndOfStream(sentSize);
} while (!done.get() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
......
......@@ -49,7 +49,6 @@ import de.itvsh.kop.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
@Disabled // FIXME
class GrpcFileUploadUtilsTest {
@Spy
......@@ -80,7 +79,7 @@ class GrpcFileUploadUtilsTest {
@Test
void shouldCreateRequestObserver() {
GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send();
verify(reqObserverBuilder, atLeastOnce()).apply(notNull());
}
......@@ -237,6 +236,7 @@ class GrpcFileUploadUtilsTest {
}
}
@Disabled("unused")
@Nested
class TestReadFromStream {
@Test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment