From 18ec16871e41279509743f869fbdca2145fbe293 Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Tue, 1 Apr 2025 13:03:24 +0200 Subject: [PATCH 01/12] OZG-7573 OZG-7991 File streaming using existing stream --- .../binaryfile/GrpcFileUploadUtils.java | 191 +------------ .../binaryfile/StreamExclusiveFileSender.java | 81 ++++++ .../binaryfile/StreamSharingFileSender.java | 67 +++++ .../binaryfile/StreamingFileSender.java | 176 ++++++++++++ .../binaryfile/GrpcFileUploadUtilsTest.java | 256 ++++------------- .../StreamExclusiveFileSenderTest.java | 195 +++++++++++++ .../StreamSharingFileSenderTest.java | 105 +++++++ .../binaryfile/StreamingFileSenderTest.java | 257 ++++++++++++++++++ 8 files changed, 949 insertions(+), 379 deletions(-) create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java create mode 100644 ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index 1cbd50d..1b7f1c1 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -23,225 +23,60 @@ */ package de.ozgcloud.common.binaryfile; -import java.io.IOException; import java.io.InputStream; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; -import org.apache.commons.io.IOUtils; - -import de.ozgcloud.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.AccessLevel; -import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class GrpcFileUploadUtils { - static final int CHUNK_SIZE = 4 * 1024; - /* * Q = Request Type; S = Response Type */ public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { - return createSender(chunkBuilder, inputStream, reqObserverBuilder, true); + return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder); } - public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, - Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) { - return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent); + public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) { + return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); } public static class FileSender<Q, S> { - private final BiFunction<byte[], Integer, Q> chunkBuilder; - private final InputStream inputStream; - @Getter - private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); - private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; - private CallStreamObserver<Q> requestObserver; + private final StreamExclusiveFileSender<Q, S> sender; - private Optional<Q> metaData = Optional.empty(); - private final AtomicBoolean metaDataSent = new AtomicBoolean(false); - private final AtomicBoolean done = new AtomicBoolean(false); - - private final StreamReader streamReader; - private final boolean completeOnFileSent; - - FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, - InputStream inputStream) { - this(chunkBuilder, reqObserverBuilder, inputStream, true); - } - - FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, - InputStream inputStream, boolean completeOnFileSent) { - this.chunkBuilder = chunkBuilder; - this.inputStream = inputStream; - this.reqObserverBuilder = reqObserverBuilder; - this.completeOnFileSent = completeOnFileSent; - - this.streamReader = new StreamReader(this.inputStream); + FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { + this.sender = new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder); } public FileSender<Q, S> withMetaData(@NonNull Q metaData) { - this.metaData = Optional.of(metaData); + sender.withMetaData(metaData); return this; } public FileSender<Q, S> send() { - LOG.debug("Start sending File."); - var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext); - requestObserver = reqObserverBuilder.apply(responseObserver); - + sender.send(); return this; } public void cancelOnTimeout() { - LOG.warn("File transfer canceled on timeout"); - resultFuture.cancel(true); - requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); - closeStreams(); + sender.cancelOnTimeout(); } public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); - resultFuture.cancel(true); - requestObserver.onError(t); - closeStreams(); - } - - void sendNext() { - if (!done.get()) { - waitForOberver(); - sendMetaData(); - do { - LOG.debug("Sending next chunk."); - sendNextChunk(); - } while (!done.get() && isReady()); - LOG.debug("Finished or waiting to become ready."); - } - } - - private boolean isReady() { - return requestObserver.isReady(); - } - - private void waitForOberver() { - synchronized (this) { - while (Objects.isNull(requestObserver)) { - try { - LOG.debug("wait for observer"); - wait(300); - } catch (InterruptedException e) { - LOG.error("Error on waiting for request Observer.", e); - Thread.currentThread().interrupt(); - } - } - } - - } - - void sendNextChunk() { - byte[] contentToSend = streamReader.getNextData(); - - if (streamReader.getLastReadSize() > 0) { - sendChunk(contentToSend, streamReader.getLastReadSize()); - } else { - endTransfer(); - } - } - - private void endTransfer() { - if (completeOnFileSent) - requestObserver.onCompleted(); - else - sendEndOfFile(); - done.set(true); - LOG.debug("File Transfer done."); - closeStreams(); - - } - - private void sendEndOfFile() { - sendChunk(new byte[0], streamReader.getLastReadSize()); - } - - void closeStreams() { - LOG.debug("Closing streams"); - streamReader.close(); - } - - void sendChunk(byte[] content, int length) { - LOG.debug("Sending {} byte Data.", length); - var chunk = chunkBuilder.apply(content, length); - requestObserver.onNext(chunk); - } - - byte[] readFromStream() { - try { - return inputStream.readNBytes(CHUNK_SIZE); - } catch (IOException e) { - throw new TechnicalException("Error on sending a single chunk", e); - } - } - - void sendMetaData() { - metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData); - } - - private void doSendMetaData(Q metadata) { - LOG.debug("Sending Metadata."); - requestObserver.onNext(metadata); - metaDataSent.set(true); - } - - void checkForEndOfStream(long sentSize) { - if (sentSize < CHUNK_SIZE) { - LOG.debug("File Transfer done. Closing stream."); - IOUtils.closeQuietly(inputStream); - requestObserver.onCompleted(); - done.set(true); - } else { - LOG.debug("File Transfer not jet done - need to tranfer another chunk."); - } - } - - @RequiredArgsConstructor - private class StreamReader { - private final InputStream inStream; - private final byte[] buffer = new byte[CHUNK_SIZE]; - @Getter - private int lastReadSize = 0; - @Getter - private final AtomicBoolean done = new AtomicBoolean(false); - - byte[] getNextData() { - readNext(); - return buffer; - } - - void close() { - IOUtils.closeQuietly(inStream); - } - - void readNext() { - try { - lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE); - } catch (IOException e) { - throw new TechnicalException("Error on reading a single chunk", e); - } - } + sender.cancelOnError(t); } } diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java new file mode 100644 index 0000000..8bcf0b3 --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.InputStream; +import java.util.function.BiFunction; +import java.util.function.Function; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> { + + private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; + private CallStreamObserver<Q> requestObserver; + + StreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { + super(chunkBuilder, inputStream); + + this.reqObserverBuilder = reqObserverBuilder; + } + + @Override + public StreamExclusiveFileSender<Q, S> send() { + LOG.debug("Start sending File."); + + // this responseObserver registers also onReadyHandler + var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext); + requestObserver = reqObserverBuilder.apply(responseObserver); + + return this; + } + + @Override + void communicateEndOfTransfer() { + requestObserver.onCompleted(); + } + + @Override + protected CallStreamObserver<Q> getRequestObserver() { + return requestObserver; + } + + public void cancelOnTimeout() { + LOG.warn("File transfer canceled on timeout"); + getResultFuture().cancel(true); + requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); + closeStreams(); + } + + public void cancelOnError(Throwable t) { + LOG.error("File tranfer canceled on error.", t); + getResultFuture().cancel(true); + requestObserver.onError(t); + closeStreams(); + } +} diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java new file mode 100644 index 0000000..c5c993a --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.InputStream; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import io.grpc.stub.CallStreamObserver; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +class StreamSharingFileSender<Q, S> extends StreamingFileSender<Q, S> { + + private final CallStreamObserver<Q> requestObserver; + private final Consumer<Runnable> onReadyHandlerRegistrar; + + StreamSharingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) { + super(chunkBuilder, inputStream); + + this.requestObserver = requestObserver; + this.onReadyHandlerRegistrar = onReadyHandlerRegistrar; + } + + @Override + public StreamSharingFileSender<Q, S> send() { + LOG.debug("Register onReadyHandler and start sending File."); + onReadyHandlerRegistrar.accept(this::sendNext); + return this; + } + + @Override + void communicateEndOfTransfer() { + sendEndOfFile(); + getResultFuture().complete(null); + } + + private void sendEndOfFile() { + sendChunk(new byte[0], -1); + } + + @Override + protected CallStreamObserver<Q> getRequestObserver() { + return requestObserver; + } +} diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java new file mode 100644 index 0000000..926ca00 --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; + +import org.apache.commons.io.IOUtils; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +/* + * Q = Request Type; S = Response Type + */ +@Log4j2 +public abstract class StreamingFileSender<Q, S> { + + static final int CHUNK_SIZE = 4 * 1024; + + private final BiFunction<byte[], Integer, Q> chunkBuilder; + + @Getter + private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); + + private Q metaData; + private final AtomicBoolean metaDataSent = new AtomicBoolean(false); + private final AtomicBoolean done = new AtomicBoolean(false); + + @Getter(AccessLevel.PROTECTED) + private final StreamReader streamReader; + + StreamingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream) { + this.chunkBuilder = chunkBuilder; + this.streamReader = new StreamReader(inputStream); + } + + public StreamingFileSender<Q, S> withMetaData(@NonNull Q metaData) { + this.metaData = metaData; + return this; + } + + public abstract StreamingFileSender<Q, S> send(); + + void sendNext() { + if (!done.get()) { + waitForOberver(); + sendMetaData(); + do { + LOG.debug("Sending next chunk."); + sendNextChunk(); + } while (!done.get() && isReady()); + LOG.debug("Finished or waiting to become ready."); + } + } + + private boolean isReady() { + return getRequestObserver().isReady(); + } + + private void waitForOberver() { + synchronized (this) { + while (Objects.isNull(getRequestObserver())) { + try { + LOG.debug("wait for observer"); + wait(300); + } catch (InterruptedException e) { + LOG.error("Error on waiting for request Observer.", e); + Thread.currentThread().interrupt(); + } + } + } + + } + + void sendNextChunk() { + byte[] contentToSend = streamReader.getNextData(); + + if (streamReader.getLastReadSize() > 0) { + sendChunk(contentToSend, streamReader.getLastReadSize()); + } else { + endTransfer(); + } + } + + protected void endTransfer() { + communicateEndOfTransfer(); + done.set(true); + LOG.debug("File Transfer done."); + closeStreams(); + } + + abstract void communicateEndOfTransfer(); + + void closeStreams() { + LOG.debug("Closing streams"); + streamReader.close(); + } + + void sendChunk(byte[] content, int length) { + LOG.debug("Sending {} byte Data.", length); + var chunk = chunkBuilder.apply(content, length); + getRequestObserver().onNext(chunk); + } + + void sendMetaData() { + if (metaData != null && !metaDataSent.get()) { + doSendMetaData(metaData); + } + } + + private void doSendMetaData(Q metadata) { + LOG.debug("Sending Metadata."); + getRequestObserver().onNext(metadata); + metaDataSent.set(true); + } + + protected abstract CallStreamObserver<Q> getRequestObserver(); + + @RequiredArgsConstructor + protected static class StreamReader { + private final InputStream inStream; + private final byte[] buffer = new byte[CHUNK_SIZE]; + @Getter + private int lastReadSize = 0; + @Getter + private final AtomicBoolean done = new AtomicBoolean(false); + + byte[] getNextData() { + readNext(); + return buffer; + } + + void close() { + IOUtils.closeQuietly(inStream); + } + + void readNext() { + try { + lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE); + } catch (IOException e) { + throw new TechnicalException("Error on reading a single chunk", e); + } + } + } +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java index fb77f3c..82c4698 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java @@ -24,289 +24,143 @@ package de.ozgcloud.common.binaryfile; import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; -import org.apache.commons.lang3.RandomUtils; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InjectMocks; import org.mockito.Mock; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; -import de.ozgcloud.common.errorhandling.TechnicalException; +import de.ozgcloud.common.test.ReflectionTestUtils; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; -import lombok.SneakyThrows; class GrpcFileUploadUtilsTest { - @InjectMocks - private GrpcFileUploadUtils service; - @Mock private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; @Mock + private InputStream inputStream; + @Mock private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder; @Mock private CallStreamObserver<TestRequestType> requestObserver; @Mock - private InputStream inputStream; - - private FileSender<TestRequestType, TestResponseType> fileSender; - - @Mock - private TestRequestType metaData; - - @BeforeEach - void init() { - when(reqObserverBuilder.apply(any())).thenReturn(requestObserver); - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder)); - } + private Consumer<Runnable> onReadyHandlerRegistrar; @Nested - class TestCreateFileSender { + class TestCreateSender { @Test - void shouldCreateRequestObserver() { - GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send(); + void shouldReturnInstanceOfFileSender() { + var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder); - verify(reqObserverBuilder, atLeastOnce()).apply(notNull()); + assertThat(createdSender).isInstanceOf(FileSender.class); } } @Nested - class TestSendBinaryFile { - - @Captor - private ArgumentCaptor<Runnable> runnableCaptor; + class TestCreateStreamSharingSender { @Test - void shouldReturnSenderWithFuture() { - var result = fileSender.send(); + void shouldReturnInstanceOfStreamSharingSender() { + var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); - assertThat(result).isNotNull().extracting(FileSender::getResultFuture).isNotNull(); + assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class); } } @Nested - class TestSendNext { - - @BeforeEach - void initObserver() { - fileSender.send(); - } - - @Test - void shouldCallSendMetaData() { - fileSender.sendNext(); + class TestFileSender { - verify(fileSender).sendMetaData(); - } + private final FileSender<TestRequestType, TestResponseType> fileSender = new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder); + @Mock + private StreamExclusiveFileSender<TestRequestType, TestResponseType> streamExclusiveFileSender; @Test - void shouldSendNextChunk() { - fileSender.sendNext(); + void shouldCreateStreamExclusiveFileSender() { + var internalFileSender = ReflectionTestUtils.getField(fileSender, "sender", StreamExclusiveFileSender.class); - verify(fileSender).sendNextChunk(); + assertThat(internalFileSender).isInstanceOf(StreamExclusiveFileSender.class); } - } - - @Nested - class TestSendNextChunk { - - private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2); - private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content); - - @Captor - private ArgumentCaptor<byte[]> chunkCaptor; - @Nested - class TestOnDataAvailable { - @BeforeEach - void initObserver() { - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder)); - fileSender.send(); - } - - @Test - void shouldCallSendChunk() { - fileSender.sendNextChunk(); + class TestMethods { - verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); - assertThat(chunkCaptor.getValue()).contains(content); + @BeforeEach + void init() { + ReflectionTestUtils.setField(fileSender, "sender", streamExclusiveFileSender); } - } - - @Nested - class TestOnNoBytesLeftToRead { @Nested - class TestOnCompleteOnFileSent { - private static final boolean COMPLETE_ON_FILE_SENT = true; - - @BeforeEach - void initialize() { - var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; - byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); - fileSender.send(); - } + class TestWithMetaData { - @Test - void shouldCallOnCompleted() { - fileSender.sendNextChunk(); - - verify(requestObserver).onCompleted(); - } + private final TestRequestType request = new TestRequestType(); @Test - void shouldNotCallSendChunk() { - fileSender.sendNextChunk(); + void shouldDelegate() { + fileSender.withMetaData(request); - verify(fileSender, never()).sendChunk(any(), anyInt()); + verify(streamExclusiveFileSender).withMetaData(request); } @Test - @SneakyThrows - void shouldCallCloseStreams() { - fileSender.sendNextChunk(); + void shouldReturnItself() { + var senderWithMetaData = fileSender.withMetaData(request); - verify(fileSender).closeStreams(); + assertThat(senderWithMetaData).isSameAs(fileSender); } } @Nested - class TestOnNotCompleteOnFileSent { - private static final boolean COMPLETE_ON_FILE_SENT = false; - - @BeforeEach - void initialize() { - var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; - byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); - fileSender.send(); - } + class TestSend { @Test - void shouldNotCallOnCompleted() { - fileSender.sendNextChunk(); + void shouldDelegate() { + fileSender.send(); - verify(requestObserver, never()).onCompleted(); + verify(streamExclusiveFileSender).send(); } @Test - void shouldCallSendChunk() { - fileSender.sendNextChunk(); + void shouldReturnItself() { + var returnedSender = fileSender.send(); - verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1)); - assertThat(chunkCaptor.getValue()).isEmpty(); + assertThat(returnedSender).isSameAs(fileSender); } + } + + @Nested + class TestCancelOnTimeout { @Test - @SneakyThrows - void shouldCallCloseStreams() { - fileSender.sendNextChunk(); + void shouldDelegate() { + fileSender.cancelOnTimeout(); - verify(fileSender).closeStreams(); + verify(streamExclusiveFileSender).cancelOnTimeout(); } } - } - - } - - @Nested - class TestCloseStreams { - - @Test - @SneakyThrows - void shouldCloseInputStream() { - fileSender.send(); - - fileSender.closeStreams(); - - verify(inputStream).close(); - } - } - - @Nested - class TestSendChunk { - - private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes(); - - @BeforeEach - void initObserver() { - fileSender.send(); - } - - @Test - void shouldApplyBuildChunk() throws IOException { - fileSender.sendChunk(CHUNK_PART, 5); - verify(chunkBuilder).apply(CHUNK_PART, 5); - } - - @Test - void shouldCallOnNext() throws IOException { - fileSender.sendChunk(CHUNK_PART, 5); - - verify(requestObserver).onNext(any()); - } - } - - @Nested - class TestSendMetaData { - - @BeforeEach - void initObserver() { - fileSender.send(); - } - - @Test - void shouldNotSendWithoutMetadata() { - fileSender.sendMetaData(); - - verify(requestObserver, never()).onNext(any()); - } - - @Test - void shouldSendMetadata() { - fileSender.withMetaData(metaData).sendMetaData(); - - verify(requestObserver).onNext(metaData); - } - - @Test - void shouldSendMetadataOnlyOnce() { - fileSender.withMetaData(metaData).sendMetaData(); - fileSender.sendMetaData(); + @Nested + class TestCancelOnError { - verify(requestObserver).onNext(metaData); - } - } + @Test + void shouldDelegate() { + var error = new Throwable(); - @Disabled("unused") - @Nested - class TestReadFromStream { - @Test - void shouldThrowException() throws IOException { - doThrow(IOException.class).when(inputStream).read(any(), anyInt(), anyInt()); + fileSender.cancelOnError(error); - assertThatThrownBy(() -> fileSender.readFromStream()).isInstanceOf(TechnicalException.class); + verify(streamExclusiveFileSender).cancelOnError(error); + } + } } } - } \ No newline at end of file diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java new file mode 100644 index 0000000..04b347c --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.mockito.Mockito.*; + +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Spy; + +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; +import de.ozgcloud.common.errorhandling.TechnicalException; +import de.ozgcloud.common.test.ReflectionTestUtils; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; + +class StreamExclusiveFileSenderTest { + + @Mock + private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; + @Mock + private InputStream inputStream; + @Mock + private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder; + @Spy + @InjectMocks + private StreamExclusiveFileSender<TestRequestType, TestResponseType> fileSender; + + @Nested + class TestSend { + + private final CompletableFuture<TestResponseType> resultFuture = CompletableFuture.completedFuture(new TestResponseType()); + @Captor + private ArgumentCaptor<Runnable> onReadyHandlerCaptor; + @Mock + private BinaryFileUploadStreamObserver<TestRequestType, TestResponseType> responseObserver; + + @BeforeEach + void init() { + doReturn(resultFuture).when(fileSender).getResultFuture(); + } + + @SuppressWarnings("rawtypes") + @Test + void shouldCreateResponseObserver() { + try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) { + doNothing().when(fileSender).sendNext(); + + fileSender.send(); + + mocked.verify(() -> BinaryFileUploadStreamObserver.create(same(resultFuture), onReadyHandlerCaptor.capture())); + verifyCallsSendNext(onReadyHandlerCaptor.getValue()); + } + } + + @SuppressWarnings("rawtypes") + @Test + void shouldBuildRequestObserver() { + try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) { + mocked.when(() -> BinaryFileUploadStreamObserver.create(any(), any())).thenReturn(responseObserver); + + fileSender.send(); + + verify(reqObserverBuilder).apply(responseObserver); + } + } + + private void verifyCallsSendNext(Runnable runnable) { + runnable.run(); + verify(fileSender).sendNext(); + } + } + + @Nested + class TestCommunicateEndOfTransfer { + + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + + @Test + void shouldCallOnCompleted() { + ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); + + fileSender.communicateEndOfTransfer(); + + verify(requestObserver).onCompleted(); + } + } + + @Nested + class TestCancelOnTimeout { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + + @BeforeEach + void init() { + doReturn(resultFuture).when(fileSender).getResultFuture(); + ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnTimeout(); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCallOnError() { + fileSender.cancelOnTimeout(); + + verify(requestObserver).onError(any(TechnicalException.class)); + } + + @Test + void shouldCloseStreams() { + fileSender.cancelOnTimeout(); + + verify(fileSender).closeStreams(); + } + } + + @Nested + class TestCancelOnError { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + private final Throwable error = new Throwable(); + + @BeforeEach + void init() { + doReturn(resultFuture).when(fileSender).getResultFuture(); + ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnError(error); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCallOnError() { + fileSender.cancelOnError(error); + + verify(requestObserver).onError(error); + } + + @Test + void shouldCloseStreams() { + fileSender.cancelOnError(error); + + verify(fileSender).closeStreams(); + } + } +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java new file mode 100644 index 0000000..6c2225c --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.InputStream; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; + +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; +import io.grpc.stub.CallStreamObserver; + +class StreamSharingFileSenderTest { + + @Mock + private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; + @Mock + private InputStream inputStream; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + @Mock + private Consumer<Runnable> onReadyHandlerRegistrar; + @Spy + @InjectMocks + private StreamSharingFileSender<TestRequestType, TestResponseType> fileSender; + + @Nested + class TestSend { + + @Captor + private ArgumentCaptor<Runnable> onReadyHandlerCaptor; + + @Test + void shouldRegisterOnReadyHandler() { + doNothing().when(fileSender).sendNext(); + + fileSender.send(); + + verify(onReadyHandlerRegistrar).accept(onReadyHandlerCaptor.capture()); + verifyCallsSendNext(onReadyHandlerCaptor.getValue()); + } + + @Test + void shouldReturnThis() { + var obj = fileSender.send(); + + assertThat(obj).isSameAs(fileSender); + } + + private void verifyCallsSendNext(Runnable runnable) { + runnable.run(); + verify(fileSender).sendNext(); + } + } + + @Nested + class TestCommunicateEndOfTransfer { + + @Test + void shouldSendEmptyChunk() { + fileSender.communicateEndOfTransfer(); + + verify(fileSender).sendChunk(new byte[0], -1); + } + + @Test + void shouldCompleteResultFuture() { + fileSender.communicateEndOfTransfer(); + + assertThat(fileSender.getResultFuture().isDone()).isTrue(); + } + } +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java new file mode 100644 index 0000000..711461a --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -0,0 +1,257 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.function.BiFunction; + +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; +import io.grpc.stub.CallStreamObserver; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.SneakyThrows; + +class StreamingFileSenderTest { + + @Mock + private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; + @Mock + private InputStream inputStream; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + + private TestFileSender fileSender; + + @BeforeEach + void init() { + fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver)); + } + + @Nested + class TestSendNext { + + @BeforeEach + void init() { + fileSender.send(); + } + + @Test + void shouldCallSendMetaData() { + fileSender.sendNext(); + + verify(fileSender).sendMetaData(); + } + + @Test + void shouldSendNextChunk() { + fileSender.sendNext(); + + verify(fileSender).sendNextChunk(); + } + } + + @Nested + class TestSendNextChunk { + + private final byte[] content = RandomUtils.insecure().randomBytes(StreamingFileSender.CHUNK_SIZE / 2); + private final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content); + + @Captor + private ArgumentCaptor<byte[]> chunkCaptor; + + @Nested + class TestOnDataAvailable { + + @BeforeEach + void init() { + fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver)); + } + + @Test + void shouldCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); + assertThat(chunkCaptor.getValue()).contains(content); + } + } + + @Nested + class TestOnNoBytesLeftToRead { + + @BeforeEach + void init() { + var buffer = new byte[StreamingFileSender.CHUNK_SIZE]; + byteArrayInputStream.read(buffer, 0, StreamingFileSender.CHUNK_SIZE); + fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver)); + } + + @Test + void shouldNotCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender, never()).sendChunk(any(), anyInt()); + } + + @Test + @SneakyThrows + void shouldCallCloseStreams() { + fileSender.sendNextChunk(); + + verify(fileSender).closeStreams(); + } + } + } + + @Nested + class TestEndTransfer { + + @Test + void shouldCommunicateEndOfTransfer() { + fileSender.endTransfer(); + + verify(fileSender).communicateEndOfTransfer(); + } + + @Test + void shouldSetDoneToTrue() { + fileSender.endTransfer(); + + fileSender.sendNext(); + verify(fileSender, never()).sendNextChunk(); + } + + @Test + void shouldCloseStreams() { + fileSender.endTransfer(); + + verify(fileSender).closeStreams(); + } + } + + @Nested + class TestCloseStreams { + + @Test + @SneakyThrows + void shouldCloseInputStream() { + fileSender.closeStreams(); + + verify(inputStream).close(); + } + } + + @Nested + class TestSendChunk { + + private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes(); + + @BeforeEach + void initObserver() { + fileSender.send(); + } + + @Test + void shouldApplyBuildChunk() { + fileSender.sendChunk(CHUNK_PART, 5); + + verify(chunkBuilder).apply(CHUNK_PART, 5); + } + + @Test + void shouldCallOnNext() { + fileSender.sendChunk(CHUNK_PART, 5); + + verify(requestObserver).onNext(any()); + } + } + + @Nested + class TestSendMetaData { + + @Mock + private TestRequestType metaData; + + @BeforeEach + void initObserver() { + fileSender.send(); + } + + @Test + void shouldNotSendWithoutMetadata() { + fileSender.sendMetaData(); + + verify(requestObserver, never()).onNext(any()); + } + + @Test + void shouldSendMetadata() { + fileSender.withMetaData(metaData).sendMetaData(); + + verify(requestObserver).onNext(metaData); + } + + @Test + void shouldSendMetadataOnlyOnce() { + fileSender.withMetaData(metaData).sendMetaData(); + fileSender.sendMetaData(); + + verify(requestObserver).onNext(metaData); + } + } + + static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { + + @Getter(AccessLevel.PROTECTED) + private final CallStreamObserver<TestRequestType> requestObserver; + + TestFileSender(BiFunction<byte[], Integer, TestRequestType> chunkBuilder, InputStream inputStream, + CallStreamObserver<TestRequestType> requestObserver) { + super(chunkBuilder, inputStream); + this.requestObserver = requestObserver; + } + + @Override + public StreamingFileSender<TestRequestType, TestResponseType> send() { + return this; + } + + @Override + void communicateEndOfTransfer() { + } + } +} -- GitLab From 158e157be4ca026b280f44b4518a41770e441bb3 Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Tue, 1 Apr 2025 13:40:10 +0200 Subject: [PATCH 02/12] OZG-7573 OZG-7991 Make FileSender extend (not delegate to) StreamExclusiveFileSender --- .../binaryfile/GrpcFileUploadUtils.java | 46 ++++----- .../binaryfile/GrpcFileUploadUtilsTest.java | 99 ++----------------- 2 files changed, 31 insertions(+), 114 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index 1b7f1c1..f66eb63 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -32,51 +32,47 @@ import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.AccessLevel; import lombok.NoArgsConstructor; -import lombok.NonNull; import lombok.extern.log4j.Log4j2; @Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class GrpcFileUploadUtils { - /* - * Q = Request Type; S = Response Type + /** + * @param <Q> Request Type + * @param <S> Response Type + * @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead */ + @Deprecated(since = "4.13.0") public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder); } + /** + * @param <Q> Request Type + * @param <S> Response Type + */ + public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { + return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder); + } + + /** + * @param <Q> Request Type + * @param <S> Response Type + */ public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) { return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); } - public static class FileSender<Q, S> { - - private final StreamExclusiveFileSender<Q, S> sender; + // for backwards compatibility + public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> { FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { - this.sender = new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder); - } - - public FileSender<Q, S> withMetaData(@NonNull Q metaData) { - sender.withMetaData(metaData); - return this; - } - - public FileSender<Q, S> send() { - sender.send(); - return this; - } - - public void cancelOnTimeout() { - sender.cancelOnTimeout(); - } - - public void cancelOnError(Throwable t) { - sender.cancelOnError(t); + super(chunkBuilder, inputStream, reqObserverBuilder); } } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java index 82c4698..c94b69e 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java @@ -24,22 +24,18 @@ package de.ozgcloud.common.binaryfile; import static org.assertj.core.api.Assertions.*; -import static org.mockito.Mockito.*; import java.io.InputStream; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.mockito.Mock; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; -import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; -import de.ozgcloud.common.test.ReflectionTestUtils; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; @@ -60,107 +56,32 @@ class GrpcFileUploadUtilsTest { class TestCreateSender { @Test - void shouldReturnInstanceOfFileSender() { + void shouldReturnInstanceOfStreamExclusiveFileSender() { var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder); - assertThat(createdSender).isInstanceOf(FileSender.class); + assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class); } } @Nested - class TestCreateStreamSharingSender { + class TestCreateStreamExclusiveFileSender { @Test - void shouldReturnInstanceOfStreamSharingSender() { - var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); + void shouldReturnInstanceOfStreamExclusiveFileSender() { + var createdSender = GrpcFileUploadUtils.createStreamExclusiveFileSender(chunkBuilder, inputStream, reqObserverBuilder); - assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class); + assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class); } } @Nested - class TestFileSender { - - private final FileSender<TestRequestType, TestResponseType> fileSender = new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder); - @Mock - private StreamExclusiveFileSender<TestRequestType, TestResponseType> streamExclusiveFileSender; + class TestCreateStreamSharingSender { @Test - void shouldCreateStreamExclusiveFileSender() { - var internalFileSender = ReflectionTestUtils.getField(fileSender, "sender", StreamExclusiveFileSender.class); - - assertThat(internalFileSender).isInstanceOf(StreamExclusiveFileSender.class); - } - - @Nested - class TestMethods { - - @BeforeEach - void init() { - ReflectionTestUtils.setField(fileSender, "sender", streamExclusiveFileSender); - } - - @Nested - class TestWithMetaData { - - private final TestRequestType request = new TestRequestType(); - - @Test - void shouldDelegate() { - fileSender.withMetaData(request); - - verify(streamExclusiveFileSender).withMetaData(request); - } - - @Test - void shouldReturnItself() { - var senderWithMetaData = fileSender.withMetaData(request); - - assertThat(senderWithMetaData).isSameAs(fileSender); - } - } - - @Nested - class TestSend { - - @Test - void shouldDelegate() { - fileSender.send(); - - verify(streamExclusiveFileSender).send(); - } - - @Test - void shouldReturnItself() { - var returnedSender = fileSender.send(); - - assertThat(returnedSender).isSameAs(fileSender); - } - } - - @Nested - class TestCancelOnTimeout { - - @Test - void shouldDelegate() { - fileSender.cancelOnTimeout(); - - verify(streamExclusiveFileSender).cancelOnTimeout(); - } - } - - @Nested - class TestCancelOnError { - - @Test - void shouldDelegate() { - var error = new Throwable(); - - fileSender.cancelOnError(error); + void shouldReturnInstanceOfStreamSharingSender() { + var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); - verify(streamExclusiveFileSender).cancelOnError(error); - } - } + assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class); } } } \ No newline at end of file -- GitLab From 4d3043cbd427093b7d3c2f2412fbb8fc21a216bd Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Tue, 1 Apr 2025 14:01:57 +0200 Subject: [PATCH 03/12] OZG-7573 OZG-7991 Make FileSender override methods that return this --- .../common/binaryfile/GrpcFileUploadUtils.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index f66eb63..74cd1c2 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -32,6 +32,7 @@ import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import lombok.NonNull; import lombok.extern.log4j.Log4j2; @Log4j2 @@ -74,6 +75,18 @@ public class GrpcFileUploadUtils { Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { super(chunkBuilder, inputStream, reqObserverBuilder); } + + @Override + public FileSender<Q, S> send() { + super.send(); + return this; + } + + @Override + public FileSender<Q, S> withMetaData(@NonNull Q metaData) { + super.withMetaData(metaData); + return this; + } } } \ No newline at end of file -- GitLab From 0317820ed5df3af8d5d231d881e7754133e87e8c Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Tue, 1 Apr 2025 17:58:41 +0200 Subject: [PATCH 04/12] OZG-7573 OZG-7991 Mark FileSender-class deprecated --- .../java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index 74cd1c2..b021ed1 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -68,7 +68,7 @@ public class GrpcFileUploadUtils { return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); } - // for backwards compatibility + @Deprecated(since = "4.13.0") public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> { FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, -- GitLab From a0d677b3bff35dd101f28016fad733737f338a7c Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Tue, 1 Apr 2025 17:59:15 +0200 Subject: [PATCH 05/12] OZG-7573 OZG-7991 Move cancelOnTimeout() and cancelOnError() to StreamingFileSender --- .../binaryfile/StreamExclusiveFileSender.java | 8 +-- .../binaryfile/StreamingFileSender.java | 12 ++++ .../StreamExclusiveFileSenderTest.java | 6 +- .../binaryfile/StreamingFileSenderTest.java | 55 +++++++++++++++++++ 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java index 8bcf0b3..92f9170 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java @@ -66,16 +66,12 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> { } public void cancelOnTimeout() { - LOG.warn("File transfer canceled on timeout"); - getResultFuture().cancel(true); + super.cancelOnTimeout(); requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); - closeStreams(); } public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); - getResultFuture().cancel(true); + super.cancelOnError(t); requestObserver.onError(t); - closeStreams(); } } diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index 926ca00..f70efbc 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -72,6 +72,18 @@ public abstract class StreamingFileSender<Q, S> { public abstract StreamingFileSender<Q, S> send(); + public void cancelOnTimeout() { + LOG.warn("File transfer canceled on timeout"); + resultFuture.cancel(true); + closeStreams(); + } + + public void cancelOnError(Throwable t) { + LOG.error("File tranfer canceled on error.", t); + resultFuture.cancel(true); + closeStreams(); + } + void sendNext() { if (!done.get()) { waitForOberver(); diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java index 04b347c..880ac38 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java @@ -39,11 +39,11 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Spy; +import org.springframework.test.util.ReflectionTestUtils; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; import de.ozgcloud.common.errorhandling.TechnicalException; -import de.ozgcloud.common.test.ReflectionTestUtils; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; @@ -130,7 +130,7 @@ class StreamExclusiveFileSenderTest { @BeforeEach void init() { - doReturn(resultFuture).when(fileSender).getResultFuture(); + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); } @@ -167,7 +167,7 @@ class StreamExclusiveFileSenderTest { @BeforeEach void init() { - doReturn(resultFuture).when(fileSender).getResultFuture(); + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index 711461a..47ddd15 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; import org.apache.commons.lang3.RandomUtils; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.springframework.test.util.ReflectionTestUtils; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; @@ -61,6 +63,59 @@ class StreamingFileSenderTest { fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver)); } + @Nested + class TestCancelOnTimeout { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + + @BeforeEach + void init() { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnTimeout(); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCloseStreams() { + fileSender.cancelOnTimeout(); + + verify(fileSender).closeStreams(); + } + } + + @Nested + class TestCancelOnError { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + private final Throwable error = new Throwable(); + + @BeforeEach + void init() { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnError(error); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCloseStreams() { + fileSender.cancelOnError(error); + + verify(fileSender).closeStreams(); + } + } + @Nested class TestSendNext { -- GitLab From fbceb90140a6507afca8081a5478a2997153bb38 Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 09:27:44 +0200 Subject: [PATCH 06/12] OZG-7573 OZG-7991 Rename methode closeStreams() --- .../common/binaryfile/StreamingFileSender.java | 8 ++++---- .../binaryfile/StreamExclusiveFileSenderTest.java | 8 ++++---- .../common/binaryfile/StreamingFileSenderTest.java | 12 ++++++------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index f70efbc..9d0440e 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -75,13 +75,13 @@ public abstract class StreamingFileSender<Q, S> { public void cancelOnTimeout() { LOG.warn("File transfer canceled on timeout"); resultFuture.cancel(true); - closeStreams(); + closeStreamReader(); } public void cancelOnError(Throwable t) { LOG.error("File tranfer canceled on error.", t); resultFuture.cancel(true); - closeStreams(); + closeStreamReader(); } void sendNext() { @@ -129,12 +129,12 @@ public abstract class StreamingFileSender<Q, S> { communicateEndOfTransfer(); done.set(true); LOG.debug("File Transfer done."); - closeStreams(); + closeStreamReader(); } abstract void communicateEndOfTransfer(); - void closeStreams() { + void closeStreamReader() { LOG.debug("Closing streams"); streamReader.close(); } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java index 880ac38..6ac54ef 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java @@ -149,10 +149,10 @@ class StreamExclusiveFileSenderTest { } @Test - void shouldCloseStreams() { + void shouldCloseStreamReader() { fileSender.cancelOnTimeout(); - verify(fileSender).closeStreams(); + verify(fileSender).closeStreamReader(); } } @@ -186,10 +186,10 @@ class StreamExclusiveFileSenderTest { } @Test - void shouldCloseStreams() { + void shouldCloseStreamReader() { fileSender.cancelOnError(error); - verify(fileSender).closeStreams(); + verify(fileSender).closeStreamReader(); } } } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index 47ddd15..f369aab 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -85,7 +85,7 @@ class StreamingFileSenderTest { void shouldCloseStreams() { fileSender.cancelOnTimeout(); - verify(fileSender).closeStreams(); + verify(fileSender).closeStreamReader(); } } @@ -112,7 +112,7 @@ class StreamingFileSenderTest { void shouldCloseStreams() { fileSender.cancelOnError(error); - verify(fileSender).closeStreams(); + verify(fileSender).closeStreamReader(); } } @@ -187,7 +187,7 @@ class StreamingFileSenderTest { void shouldCallCloseStreams() { fileSender.sendNextChunk(); - verify(fileSender).closeStreams(); + verify(fileSender).closeStreamReader(); } } } @@ -214,17 +214,17 @@ class StreamingFileSenderTest { void shouldCloseStreams() { fileSender.endTransfer(); - verify(fileSender).closeStreams(); + verify(fileSender).closeStreamReader(); } } @Nested - class TestCloseStreams { + class TestCloseStreamReader { @Test @SneakyThrows void shouldCloseInputStream() { - fileSender.closeStreams(); + fileSender.closeStreamReader(); verify(inputStream).close(); } -- GitLab From 83a363dd3b995a9b53b0017eb22e59014532879d Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 09:35:10 +0200 Subject: [PATCH 07/12] OZG-7573 OZG-7991 Remove comment --- .../de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java index 92f9170..8051e11 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java @@ -48,7 +48,6 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> { public StreamExclusiveFileSender<Q, S> send() { LOG.debug("Start sending File."); - // this responseObserver registers also onReadyHandler var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext); requestObserver = reqObserverBuilder.apply(responseObserver); -- GitLab From 36622779b59dd5df9e0a76e240ba35796d991afa Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 09:42:29 +0200 Subject: [PATCH 08/12] OZG-7573 OZG-7991 Verify calling endTransfer() in test --- .../common/binaryfile/StreamingFileSender.java | 2 +- .../common/binaryfile/StreamingFileSenderTest.java | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index 9d0440e..dc2af67 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -125,7 +125,7 @@ public abstract class StreamingFileSender<Q, S> { } } - protected void endTransfer() { + void endTransfer() { communicateEndOfTransfer(); done.set(true); LOG.debug("File Transfer done."); diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index f369aab..32bd933 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -125,7 +125,7 @@ class StreamingFileSenderTest { } @Test - void shouldCallSendMetaData() { + void shouldSendMetaData() { fileSender.sendNext(); verify(fileSender).sendMetaData(); @@ -157,7 +157,7 @@ class StreamingFileSenderTest { } @Test - void shouldCallSendChunk() { + void shouldSendChunk() { fileSender.sendNextChunk(); verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); @@ -176,18 +176,17 @@ class StreamingFileSenderTest { } @Test - void shouldNotCallSendChunk() { + void shouldNotSendChunk() { fileSender.sendNextChunk(); verify(fileSender, never()).sendChunk(any(), anyInt()); } @Test - @SneakyThrows - void shouldCallCloseStreams() { + void shouldEndTransfer() { fileSender.sendNextChunk(); - verify(fileSender).closeStreamReader(); + verify(fileSender).endTransfer(); } } } @@ -211,7 +210,7 @@ class StreamingFileSenderTest { } @Test - void shouldCloseStreams() { + void shouldCloseStreamReader() { fileSender.endTransfer(); verify(fileSender).closeStreamReader(); -- GitLab From 7b7c478dfe4f9c2f6f4d06c1c6312aadf2b11951 Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 09:43:40 +0200 Subject: [PATCH 09/12] OZG-7573 OZG-7991 Improve formatting --- .../de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index 32bd933..ba58775 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -282,6 +282,7 @@ class StreamingFileSenderTest { @Test void shouldSendMetadataOnlyOnce() { fileSender.withMetaData(metaData).sendMetaData(); + fileSender.sendMetaData(); verify(requestObserver).onNext(metaData); -- GitLab From 8f8039e7c46b37400ee166f4b6f0ec3dce431447 Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 10:28:44 +0200 Subject: [PATCH 10/12] OZG-7573 OZG-7991 Fix spelling --- .../de/ozgcloud/common/binaryfile/StreamingFileSender.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index dc2af67..89661ba 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -86,7 +86,7 @@ public abstract class StreamingFileSender<Q, S> { void sendNext() { if (!done.get()) { - waitForOberver(); + waitForObserver(); sendMetaData(); do { LOG.debug("Sending next chunk."); @@ -100,7 +100,7 @@ public abstract class StreamingFileSender<Q, S> { return getRequestObserver().isReady(); } - private void waitForOberver() { + private void waitForObserver() { synchronized (this) { while (Objects.isNull(getRequestObserver())) { try { -- GitLab From 8e5eefbde6274091c01e2c16ce2e48e2e8b716bc Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 15:13:21 +0200 Subject: [PATCH 11/12] OZG-7573 OZG-7991 Stop if cancelled --- .../binaryfile/StreamingFileSender.java | 18 ++++- .../binaryfile/StreamingFileSenderTest.java | 69 ++++++++++++++++++- 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java index 89661ba..de78cf1 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -79,21 +79,33 @@ public abstract class StreamingFileSender<Q, S> { } public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); + LOG.error("File transfer canceled on error.", t); resultFuture.cancel(true); closeStreamReader(); } void sendNext() { - if (!done.get()) { + if (notFinished()) { waitForObserver(); sendMetaData(); do { LOG.debug("Sending next chunk."); sendNextChunk(); - } while (!done.get() && isReady()); + } while (notFinished() && isReady()); LOG.debug("Finished or waiting to become ready."); } + checkIfFinishedForcefully(); + } + + private boolean notFinished() { + return !done.get() && !resultFuture.isCancelled(); + } + + private void checkIfFinishedForcefully() { + if (resultFuture.isCancelled()) { + LOG.warn("File transfer was cancelled"); + closeStreamReader(); + } } private boolean isReady() { diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java index ba58775..ce12c67 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.*; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import org.apache.commons.lang3.RandomUtils; @@ -71,7 +72,7 @@ class StreamingFileSenderTest { @BeforeEach void init() { - ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + setResultFutureInFileSender(resultFuture); } @Test @@ -98,7 +99,7 @@ class StreamingFileSenderTest { @BeforeEach void init() { - ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + setResultFutureInFileSender(resultFuture); } @Test @@ -119,6 +120,9 @@ class StreamingFileSenderTest { @Nested class TestSendNext { + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @BeforeEach void init() { fileSender.send(); @@ -137,6 +141,59 @@ class StreamingFileSenderTest { verify(fileSender).sendNextChunk(); } + + @Test + void shouldNotSendMetaDataIfDone() { + setDoneInFileSender(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldNotSendMetaDataIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldSendNextChunkUntilDone() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + setDoneInFileSender(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void shouldSendNextChunkUntilCancelled() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + fileSender.getResultFuture().cancel(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void closeStreamReaderIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender).closeStreamReader(); + } } @Nested @@ -289,6 +346,14 @@ class StreamingFileSenderTest { } } + private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + } + + private void setDoneInFileSender(boolean done) { + ((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done); + } + static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { @Getter(AccessLevel.PROTECTED) -- GitLab From bea6456cc191b87f7b2f9c466199fb2a1be779c6 Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Wed, 2 Apr 2025 15:14:55 +0200 Subject: [PATCH 12/12] OZG-7573 OZG-7991 Fix sonarqube warnings --- .../ozgcloud/common/binaryfile/StreamExclusiveFileSender.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java index 8051e11..4223d4d 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java @@ -64,11 +64,13 @@ class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> { return requestObserver; } + @Override public void cancelOnTimeout() { super.cancelOnTimeout(); requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); } + @Override public void cancelOnError(Throwable t) { super.cancelOnError(t); requestObserver.onError(t); -- GitLab