Skip to content
Snippets Groups Projects

Ozg 7573 files weiterleitung bug

Merged Krzysztof Witukiewicz requested to merge OZG-7573-files-weiterleitung-bug into main
2 unresolved threads
Files
8
@@ -23,225 +23,69 @@
*/
package de.ozgcloud.common.binaryfile;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class GrpcFileUploadUtils {
static final int CHUNK_SIZE = 4 * 1024;
/*
* Q = Request Type; S = Response Type
/**
* @param <Q> Request Type
* @param <S> Response Type
* @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead
*/
@Deprecated(since = "4.13.0")
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
}
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
/**
* @param <Q> Request Type
* @param <S> Response Type
*/
public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
}
public static class FileSender<Q, S> {
private final BiFunction<byte[], Integer, Q> chunkBuilder;
private final InputStream inputStream;
@Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
private CallStreamObserver<Q> requestObserver;
private Optional<Q> metaData = Optional.empty();
private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this(chunkBuilder, reqObserverBuilder, inputStream, true);
}
/**
* @param <Q> Request Type
* @param <S> Response Type
*/
public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
}
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
@Deprecated(since = "4.13.0")
public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> {
this.streamReader = new StreamReader(this.inputStream);
}
public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
this.metaData = Optional.of(metaData);
return this;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
super(chunkBuilder, inputStream, reqObserverBuilder);
}
@Override
public FileSender<Q, S> send() {
LOG.debug("Start sending File.");
var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
super.send();
return this;
}
public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout");
resultFuture.cancel(true);
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
closeStreams();
}
public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t);
resultFuture.cancel(true);
requestObserver.onError(t);
closeStreams();
}
void sendNext() {
if (!done.get()) {
waitForOberver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (!done.get() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
}
private boolean isReady() {
return requestObserver.isReady();
}
private void waitForOberver() {
synchronized (this) {
while (Objects.isNull(requestObserver)) {
try {
LOG.debug("wait for observer");
wait(300);
} catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e);
Thread.currentThread().interrupt();
}
}
}
}
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
}
}
private void endTransfer() {
if (completeOnFileSent)
requestObserver.onCompleted();
else
sendEndOfFile();
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
requestObserver.onNext(chunk);
}
byte[] readFromStream() {
try {
return inputStream.readNBytes(CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on sending a single chunk", e);
}
}
void sendMetaData() {
metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
}
private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata.");
requestObserver.onNext(metadata);
metaDataSent.set(true);
}
void checkForEndOfStream(long sentSize) {
if (sentSize < CHUNK_SIZE) {
LOG.debug("File Transfer done. Closing stream.");
IOUtils.closeQuietly(inputStream);
requestObserver.onCompleted();
done.set(true);
} else {
LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
}
}
@RequiredArgsConstructor
private class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on reading a single chunk", e);
}
}
@Override
public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
super.withMetaData(metaData);
return this;
}
}
Loading