Skip to content
Snippets Groups Projects
Commit 0850f8a2 authored by Evgeny Bardin's avatar Evgeny Bardin
Browse files

OZG-7426 do file sender trackable

parent 419f2873
Branches
No related tags found
No related merge requests found
...@@ -27,12 +27,14 @@ import java.io.IOException; ...@@ -27,12 +27,14 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.MDC;
import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver; import io.grpc.stub.CallStreamObserver;
...@@ -62,6 +64,7 @@ public class GrpcFileUploadUtils { ...@@ -62,6 +64,7 @@ public class GrpcFileUploadUtils {
private final BiFunction<byte[], Integer, Q> chunkBuilder; private final BiFunction<byte[], Integer, Q> chunkBuilder;
private final InputStream inputStream; private final InputStream inputStream;
private final String fileSenderId = UUID.randomUUID().toString();
@Getter @Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
...@@ -88,7 +91,7 @@ public class GrpcFileUploadUtils { ...@@ -88,7 +91,7 @@ public class GrpcFileUploadUtils {
} }
public FileSender<Q, S> send() { public FileSender<Q, S> send() {
LOG.debug("Start sending File."); LOG.debug("Start sending File. (fileSenderId = {})", fileSenderId);
var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext); var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver); requestObserver = reqObserverBuilder.apply(responseObserver);
...@@ -96,14 +99,14 @@ public class GrpcFileUploadUtils { ...@@ -96,14 +99,14 @@ public class GrpcFileUploadUtils {
} }
public void cancelOnTimeout() { public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout"); LOG.warn("File transfer canceled on timeout (fileSenderId = {})", fileSenderId);
resultFuture.cancel(true); resultFuture.cancel(true);
requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
closeStreams(); closeStreams();
} }
public void cancelOnError(Throwable t) { public void cancelOnError(Throwable t) {
LOG.error("File transfer canceled on error.", t); LOG.error("File transfer canceled on error. (fileSenderId = {})", fileSenderId, t);
resultFuture.cancel(true); resultFuture.cancel(true);
requestObserver.onError(t); requestObserver.onError(t);
closeStreams(); closeStreams();
...@@ -113,27 +116,26 @@ public class GrpcFileUploadUtils { ...@@ -113,27 +116,26 @@ public class GrpcFileUploadUtils {
if (done.get()) { if (done.get()) {
return; return;
} }
waitForOberver(); waitForObserver();
sendMetaData(); sendMetaData();
while (!done.get() && isReady()) { while (!done.get() && isReady()) {
LOG.debug("Sending next chunk");
sendNextChunk(); sendNextChunk();
} }
LOG.debug("Finished or waiting to become ready."); LOG.debug("Finished or waiting to become ready. (fileSenderId = {})", fileSenderId);
} }
boolean isReady() { boolean isReady() {
return requestObserver.isReady(); return requestObserver.isReady();
} }
private void waitForOberver() { private void waitForObserver() {
synchronized (this) { synchronized (this) {
while (Objects.isNull(requestObserver)) { while (Objects.isNull(requestObserver)) {
try { try {
LOG.debug("wait for observer"); LOG.debug("wait for observer (fileSenderId = {})", fileSenderId);
wait(300); wait(300);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e); LOG.error("Error on waiting for request Observer. (fileSenderId = {})", fileSenderId, e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
...@@ -155,19 +157,19 @@ public class GrpcFileUploadUtils { ...@@ -155,19 +157,19 @@ public class GrpcFileUploadUtils {
private void endTransfer() { private void endTransfer() {
requestObserver.onCompleted(); requestObserver.onCompleted();
done.set(true); done.set(true);
LOG.debug("File Transfer done."); LOG.debug("File Transfer done. (fileSenderId = {})", fileSenderId);
closeStreams(); closeStreams();
MDC.remove("fileSenderId");
} }
private void closeStreams() { private void closeStreams() {
LOG.debug("Closing streams"); LOG.debug("Closing streams (fileSenderId = {})", fileSenderId);
IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(inputStream);
streamReader.close(); streamReader.close();
} }
void sendChunk(byte[] content, int length) { void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} bytes.", length); LOG.trace("Sending {} bytes. (fileSenderId = {})", length, fileSenderId);
var chunk = chunkBuilder.apply(content, length); var chunk = chunkBuilder.apply(content, length);
requestObserver.onNext(chunk); requestObserver.onNext(chunk);
} }
...@@ -185,19 +187,19 @@ public class GrpcFileUploadUtils { ...@@ -185,19 +187,19 @@ public class GrpcFileUploadUtils {
} }
private void doSendMetaData(Q metadata) { private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata."); LOG.debug("Sending Metadata. (fileSenderId = {})", fileSenderId);
requestObserver.onNext(metadata); requestObserver.onNext(metadata);
metaDataSent.set(true); metaDataSent.set(true);
} }
void checkForEndOfStream(long sentSize) { void checkForEndOfStream(long sentSize) {
if (sentSize < CHUNK_SIZE) { if (sentSize < CHUNK_SIZE) {
LOG.debug("File Transfer done. Closing stream."); LOG.debug("File Transfer done. Closing stream. (fileSenderId = {})", fileSenderId);
IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(inputStream);
requestObserver.onCompleted(); requestObserver.onCompleted();
done.set(true); done.set(true);
} else { } else {
LOG.debug("File Transfer not jet done - need to tranfer another chunk."); LOG.debug("File Transfer not jet done - need to tranfer another chunk. (fileSenderId = {})", fileSenderId);
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment