Skip to content
Snippets Groups Projects
Commit ac2444a7 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

Revert "OZG-7573 OZG-7991 Copy GrpcFileUploadUtils from common-lib (only temporarily)"

This reverts commit adfe531b.
parent b85abcac
No related branches found
No related tags found
1 merge request!27OZG-7573 Dateien Weiterleiten
/*
* Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.util.concurrent.CompletableFuture;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseObserver<ReqT, R> {
private final CompletableFuture<R> future;
private Runnable onReadyHandler;
public static <ReqT, R> BinaryFileUploadStreamObserver<ReqT, R> create(CompletableFuture<R> future, Runnable onReadyHandler) {
BinaryFileUploadStreamObserver<ReqT, R> instance = create(future);
instance.onReadyHandler = onReadyHandler;
return instance;
}
public static <ReqT, R> BinaryFileUploadStreamObserver<ReqT, R> create(CompletableFuture<R> future) {
return new BinaryFileUploadStreamObserver<>(future);
}
@Getter
private R response;
/*
requestStream is CallStreamObserver - received from Grpc-framework. onReadyHandler calls onNext on this observer
*/
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStreamObserver) {
requestStreamObserver.setOnReadyHandler(onReadyHandler);
}
@Override
public void onNext(R response) {
this.response = response;
}
@Override
public void onError(Throwable t) {
LOG.error("Error on uploading file. Completing Future.", t);
future.completeExceptionally(t);
}
// will it even get called? requestStreamObserver.onCompleted() would need to be called first
@Override
public void onCompleted() {
LOG.debug("Complete future...");
future.complete(response);
}
}
\ No newline at end of file
/*
* Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class GrpcFileUploadUtils {
static final int CHUNK_SIZE = 4 * 1024;
/*
* Q = Request Type; S = Response Type
*/
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
}
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
}
public static class FileSender<Q, S> {
private final BiFunction<byte[], Integer, Q> chunkBuilder;
private final InputStream inputStream;
@Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
private CallStreamObserver<Q> requestObserver;
private Optional<Q> metaData = Optional.empty();
private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
this.streamReader = new StreamReader(this.inputStream);
}
public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
this.metaData = Optional.of(metaData);
return this;
}
public FileSender<Q, S> send(Consumer<Runnable> registerOnReadyHandler) {
LOG.debug("Start sending File.");
registerOnReadyHandler.accept(this::sendNext);
requestObserver = reqObserverBuilder.apply(null);
return this;
}
public FileSender<Q, S> send() {
LOG.debug("Start sending File.");
// this responseObserver registers also onReadyHandler
var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
return this;
}
public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout");
resultFuture.cancel(true);
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
closeStreams();
}
public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t);
resultFuture.cancel(true);
requestObserver.onError(t);
closeStreams();
}
void sendNext() {
if (!done.get()) {
waitForOberver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (!done.get() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
}
private boolean isReady() {
return requestObserver.isReady();
}
private void waitForOberver() {
synchronized (this) {
while (Objects.isNull(requestObserver)) {
try {
LOG.debug("wait for observer");
wait(300);
} catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e);
Thread.currentThread().interrupt();
}
}
}
}
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
}
}
private void endTransfer() {
if (completeOnFileSent) {
requestObserver.onCompleted();
} else {
sendEndOfFile();
resultFuture.complete(null);
}
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
requestObserver.onNext(chunk);
}
byte[] readFromStream() {
try {
return inputStream.readNBytes(CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on sending a single chunk", e);
}
}
void sendMetaData() {
metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
}
private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata.");
requestObserver.onNext(metadata);
metaDataSent.set(true);
}
void checkForEndOfStream(long sentSize) {
if (sentSize < CHUNK_SIZE) {
LOG.debug("File Transfer done. Closing stream.");
IOUtils.closeQuietly(inputStream);
requestObserver.onCompleted();
done.set(true);
} else {
LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
}
}
@RequiredArgsConstructor
private class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on reading a single chunk", e);
}
}
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment