diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java b/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java deleted file mode 100644 index 44635a997b8eb964568d84da60017ee4acaec15b..0000000000000000000000000000000000000000 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/BinaryFileUploadStreamObserver.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den - * Ministerpräsidenten des Landes Schleswig-Holstein - * Staatskanzlei - * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung - * - * Lizenziert unter der EUPL, Version 1.2 oder - sobald - * diese von der Europäischen Kommission genehmigt wurden - - * Folgeversionen der EUPL ("Lizenz"); - * Sie dürfen dieses Werk ausschließlich gemäß - * dieser Lizenz nutzen. - * Eine Kopie der Lizenz finden Sie hier: - * - * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 - * - * Sofern nicht durch anwendbare Rechtsvorschriften - * gefordert oder in schriftlicher Form vereinbart, wird - * die unter der Lizenz verbreitete Software "so wie sie - * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - - * ausdrücklich oder stillschweigend - verbreitet. - * Die sprachspezifischen Genehmigungen und Beschränkungen - * unter der Lizenz sind dem Lizenztext zu entnehmen. - */ -package de.ozgcloud.common.binaryfile; - -import java.util.concurrent.CompletableFuture; - -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; - -@Log4j2 -@RequiredArgsConstructor(access = AccessLevel.PRIVATE) -public class BinaryFileUploadStreamObserver<ReqT, R> implements ClientResponseObserver<ReqT, R> { - - private final CompletableFuture<R> future; - private Runnable onReadyHandler; - - public static <ReqT, R> BinaryFileUploadStreamObserver<ReqT, R> create(CompletableFuture<R> future, Runnable onReadyHandler) { - BinaryFileUploadStreamObserver<ReqT, R> instance = create(future); - instance.onReadyHandler = onReadyHandler; - return instance; - } - - public static <ReqT, R> BinaryFileUploadStreamObserver<ReqT, R> create(CompletableFuture<R> future) { - return new BinaryFileUploadStreamObserver<>(future); - } - - @Getter - private R response; - - /* - requestStream is CallStreamObserver - received from Grpc-framework. onReadyHandler calls onNext on this observer - */ - @Override - public void beforeStart(ClientCallStreamObserver<ReqT> requestStreamObserver) { - requestStreamObserver.setOnReadyHandler(onReadyHandler); - } - - @Override - public void onNext(R response) { - this.response = response; - } - - @Override - public void onError(Throwable t) { - LOG.error("Error on uploading file. Completing Future.", t); - future.completeExceptionally(t); - } - - // will it even get called? requestStreamObserver.onCompleted() would need to be called first - @Override - public void onCompleted() { - LOG.debug("Complete future..."); - future.complete(response); - } - -} \ No newline at end of file diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java deleted file mode 100644 index 56b96476db4d54e553f171fb18740957b3f4cbee..0000000000000000000000000000000000000000 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright (C) 2023 Das Land Schleswig-Holstein vertreten durch den - * Ministerpräsidenten des Landes Schleswig-Holstein - * Staatskanzlei - * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung - * - * Lizenziert unter der EUPL, Version 1.2 oder - sobald - * diese von der Europäischen Kommission genehmigt wurden - - * Folgeversionen der EUPL ("Lizenz"); - * Sie dürfen dieses Werk ausschließlich gemäß - * dieser Lizenz nutzen. - * Eine Kopie der Lizenz finden Sie hier: - * - * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 - * - * Sofern nicht durch anwendbare Rechtsvorschriften - * gefordert oder in schriftlicher Form vereinbart, wird - * die unter der Lizenz verbreitete Software "so wie sie - * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - - * ausdrücklich oder stillschweigend - verbreitet. - * Die sprachspezifischen Genehmigungen und Beschränkungen - * unter der Lizenz sind dem Lizenztext zu entnehmen. - */ -package de.ozgcloud.common.binaryfile; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.apache.commons.io.IOUtils; - -import de.ozgcloud.common.errorhandling.TechnicalException; -import io.grpc.stub.CallStreamObserver; -import io.grpc.stub.StreamObserver; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; - -@Log4j2 -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class GrpcFileUploadUtils { - - static final int CHUNK_SIZE = 4 * 1024; - - /* - * Q = Request Type; S = Response Type - */ - public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, - Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { - return createSender(chunkBuilder, inputStream, reqObserverBuilder, true); - } - - public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, - Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) { - return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent); - } - - public static class FileSender<Q, S> { - private final BiFunction<byte[], Integer, Q> chunkBuilder; - private final InputStream inputStream; - - @Getter - private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); - private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; - private CallStreamObserver<Q> requestObserver; - - private Optional<Q> metaData = Optional.empty(); - private final AtomicBoolean metaDataSent = new AtomicBoolean(false); - private final AtomicBoolean done = new AtomicBoolean(false); - - private final StreamReader streamReader; - private final boolean completeOnFileSent; - - FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, - InputStream inputStream, boolean completeOnFileSent) { - this.chunkBuilder = chunkBuilder; - this.inputStream = inputStream; - this.reqObserverBuilder = reqObserverBuilder; - this.completeOnFileSent = completeOnFileSent; - - this.streamReader = new StreamReader(this.inputStream); - } - - public FileSender<Q, S> withMetaData(@NonNull Q metaData) { - this.metaData = Optional.of(metaData); - return this; - } - - public FileSender<Q, S> send(Consumer<Runnable> registerOnReadyHandler) { - LOG.debug("Start sending File."); - - registerOnReadyHandler.accept(this::sendNext); - requestObserver = reqObserverBuilder.apply(null); - - return this; - } - - public FileSender<Q, S> send() { - LOG.debug("Start sending File."); - - // this responseObserver registers also onReadyHandler - var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext); - requestObserver = reqObserverBuilder.apply(responseObserver); - - return this; - } - - public void cancelOnTimeout() { - LOG.warn("File transfer canceled on timeout"); - resultFuture.cancel(true); - requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); - closeStreams(); - } - - public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); - resultFuture.cancel(true); - requestObserver.onError(t); - closeStreams(); - } - - void sendNext() { - if (!done.get()) { - waitForOberver(); - sendMetaData(); - do { - LOG.debug("Sending next chunk."); - sendNextChunk(); - } while (!done.get() && isReady()); - LOG.debug("Finished or waiting to become ready."); - } - } - - private boolean isReady() { - return requestObserver.isReady(); - } - - private void waitForOberver() { - synchronized (this) { - while (Objects.isNull(requestObserver)) { - try { - LOG.debug("wait for observer"); - wait(300); - } catch (InterruptedException e) { - LOG.error("Error on waiting for request Observer.", e); - Thread.currentThread().interrupt(); - } - } - } - - } - - void sendNextChunk() { - byte[] contentToSend = streamReader.getNextData(); - - if (streamReader.getLastReadSize() > 0) { - sendChunk(contentToSend, streamReader.getLastReadSize()); - } else { - endTransfer(); - } - } - - private void endTransfer() { - if (completeOnFileSent) { - requestObserver.onCompleted(); - } else { - sendEndOfFile(); - resultFuture.complete(null); - } - done.set(true); - LOG.debug("File Transfer done."); - closeStreams(); - - } - - private void sendEndOfFile() { - sendChunk(new byte[0], streamReader.getLastReadSize()); - } - - void closeStreams() { - LOG.debug("Closing streams"); - streamReader.close(); - } - - void sendChunk(byte[] content, int length) { - LOG.debug("Sending {} byte Data.", length); - var chunk = chunkBuilder.apply(content, length); - requestObserver.onNext(chunk); - } - - byte[] readFromStream() { - try { - return inputStream.readNBytes(CHUNK_SIZE); - } catch (IOException e) { - throw new TechnicalException("Error on sending a single chunk", e); - } - } - - void sendMetaData() { - metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData); - } - - private void doSendMetaData(Q metadata) { - LOG.debug("Sending Metadata."); - requestObserver.onNext(metadata); - metaDataSent.set(true); - } - - void checkForEndOfStream(long sentSize) { - if (sentSize < CHUNK_SIZE) { - LOG.debug("File Transfer done. Closing stream."); - IOUtils.closeQuietly(inputStream); - requestObserver.onCompleted(); - done.set(true); - } else { - LOG.debug("File Transfer not jet done - need to tranfer another chunk."); - } - } - - @RequiredArgsConstructor - private class StreamReader { - private final InputStream inStream; - private final byte[] buffer = new byte[CHUNK_SIZE]; - @Getter - private int lastReadSize = 0; - @Getter - private final AtomicBoolean done = new AtomicBoolean(false); - - byte[] getNextData() { - readNext(); - return buffer; - } - - void close() { - IOUtils.closeQuietly(inStream); - } - - void readNext() { - try { - lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE); - } catch (IOException e) { - throw new TechnicalException("Error on reading a single chunk", e); - } - } - } - } - -} \ No newline at end of file