diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index 1cbd50d5e2368639f27448c8f61ba7a47740ecd8..b021ed18935637b737f9d473dea168af8b8b9ca3 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -23,225 +23,69 @@ */ package de.ozgcloud.common.binaryfile; -import java.io.IOException; import java.io.InputStream; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; -import org.apache.commons.io.IOUtils; - -import de.ozgcloud.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.AccessLevel; -import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class GrpcFileUploadUtils { - static final int CHUNK_SIZE = 4 * 1024; - - /* - * Q = Request Type; S = Response Type + /** + * @param <Q> Request Type + * @param <S> Response Type + * @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead */ + @Deprecated(since = "4.13.0") public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { - return createSender(chunkBuilder, inputStream, reqObserverBuilder, true); + return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder); } - public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, - Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) { - return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent); + /** + * @param <Q> Request Type + * @param <S> Response Type + */ + public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { + return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder); } - public static class FileSender<Q, S> { - private final BiFunction<byte[], Integer, Q> chunkBuilder; - private final InputStream inputStream; - - @Getter - private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); - private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; - private CallStreamObserver<Q> requestObserver; - - private Optional<Q> metaData = Optional.empty(); - private final AtomicBoolean metaDataSent = new AtomicBoolean(false); - private final AtomicBoolean done = new AtomicBoolean(false); - - private final StreamReader streamReader; - private final boolean completeOnFileSent; - - FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, - InputStream inputStream) { - this(chunkBuilder, reqObserverBuilder, inputStream, true); - } + /** + * @param <Q> Request Type + * @param <S> Response Type + */ + public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) { + return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); + } - FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, - InputStream inputStream, boolean completeOnFileSent) { - this.chunkBuilder = chunkBuilder; - this.inputStream = inputStream; - this.reqObserverBuilder = reqObserverBuilder; - this.completeOnFileSent = completeOnFileSent; + @Deprecated(since = "4.13.0") + public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> { - this.streamReader = new StreamReader(this.inputStream); - } - - public FileSender<Q, S> withMetaData(@NonNull Q metaData) { - this.metaData = Optional.of(metaData); - return this; + FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { + super(chunkBuilder, inputStream, reqObserverBuilder); } + @Override public FileSender<Q, S> send() { - LOG.debug("Start sending File."); - var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext); - requestObserver = reqObserverBuilder.apply(responseObserver); - + super.send(); return this; } - public void cancelOnTimeout() { - LOG.warn("File transfer canceled on timeout"); - resultFuture.cancel(true); - requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); - closeStreams(); - } - - public void cancelOnError(Throwable t) { - LOG.error("File tranfer canceled on error.", t); - resultFuture.cancel(true); - requestObserver.onError(t); - closeStreams(); - } - - void sendNext() { - if (!done.get()) { - waitForOberver(); - sendMetaData(); - do { - LOG.debug("Sending next chunk."); - sendNextChunk(); - } while (!done.get() && isReady()); - LOG.debug("Finished or waiting to become ready."); - } - } - - private boolean isReady() { - return requestObserver.isReady(); - } - - private void waitForOberver() { - synchronized (this) { - while (Objects.isNull(requestObserver)) { - try { - LOG.debug("wait for observer"); - wait(300); - } catch (InterruptedException e) { - LOG.error("Error on waiting for request Observer.", e); - Thread.currentThread().interrupt(); - } - } - } - - } - - void sendNextChunk() { - byte[] contentToSend = streamReader.getNextData(); - - if (streamReader.getLastReadSize() > 0) { - sendChunk(contentToSend, streamReader.getLastReadSize()); - } else { - endTransfer(); - } - } - - private void endTransfer() { - if (completeOnFileSent) - requestObserver.onCompleted(); - else - sendEndOfFile(); - done.set(true); - LOG.debug("File Transfer done."); - closeStreams(); - - } - - private void sendEndOfFile() { - sendChunk(new byte[0], streamReader.getLastReadSize()); - } - - void closeStreams() { - LOG.debug("Closing streams"); - streamReader.close(); - } - - void sendChunk(byte[] content, int length) { - LOG.debug("Sending {} byte Data.", length); - var chunk = chunkBuilder.apply(content, length); - requestObserver.onNext(chunk); - } - - byte[] readFromStream() { - try { - return inputStream.readNBytes(CHUNK_SIZE); - } catch (IOException e) { - throw new TechnicalException("Error on sending a single chunk", e); - } - } - - void sendMetaData() { - metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData); - } - - private void doSendMetaData(Q metadata) { - LOG.debug("Sending Metadata."); - requestObserver.onNext(metadata); - metaDataSent.set(true); - } - - void checkForEndOfStream(long sentSize) { - if (sentSize < CHUNK_SIZE) { - LOG.debug("File Transfer done. Closing stream."); - IOUtils.closeQuietly(inputStream); - requestObserver.onCompleted(); - done.set(true); - } else { - LOG.debug("File Transfer not jet done - need to tranfer another chunk."); - } - } - - @RequiredArgsConstructor - private class StreamReader { - private final InputStream inStream; - private final byte[] buffer = new byte[CHUNK_SIZE]; - @Getter - private int lastReadSize = 0; - @Getter - private final AtomicBoolean done = new AtomicBoolean(false); - - byte[] getNextData() { - readNext(); - return buffer; - } - - void close() { - IOUtils.closeQuietly(inStream); - } - - void readNext() { - try { - lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE); - } catch (IOException e) { - throw new TechnicalException("Error on reading a single chunk", e); - } - } + @Override + public FileSender<Q, S> withMetaData(@NonNull Q metaData) { + super.withMetaData(metaData); + return this; } } diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java new file mode 100644 index 0000000000000000000000000000000000000000..4223d4de1b383d657d105d305d11e2d10b35ba54 --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSender.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.InputStream; +import java.util.function.BiFunction; +import java.util.function.Function; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> { + + private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; + private CallStreamObserver<Q> requestObserver; + + StreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { + super(chunkBuilder, inputStream); + + this.reqObserverBuilder = reqObserverBuilder; + } + + @Override + public StreamExclusiveFileSender<Q, S> send() { + LOG.debug("Start sending File."); + + var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext); + requestObserver = reqObserverBuilder.apply(responseObserver); + + return this; + } + + @Override + void communicateEndOfTransfer() { + requestObserver.onCompleted(); + } + + @Override + protected CallStreamObserver<Q> getRequestObserver() { + return requestObserver; + } + + @Override + public void cancelOnTimeout() { + super.cancelOnTimeout(); + requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); + } + + @Override + public void cancelOnError(Throwable t) { + super.cancelOnError(t); + requestObserver.onError(t); + } +} diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java new file mode 100644 index 0000000000000000000000000000000000000000..c5c993a0f14881912f39c3c069912b4027eb81d7 --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamSharingFileSender.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.InputStream; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import io.grpc.stub.CallStreamObserver; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +class StreamSharingFileSender<Q, S> extends StreamingFileSender<Q, S> { + + private final CallStreamObserver<Q> requestObserver; + private final Consumer<Runnable> onReadyHandlerRegistrar; + + StreamSharingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) { + super(chunkBuilder, inputStream); + + this.requestObserver = requestObserver; + this.onReadyHandlerRegistrar = onReadyHandlerRegistrar; + } + + @Override + public StreamSharingFileSender<Q, S> send() { + LOG.debug("Register onReadyHandler and start sending File."); + onReadyHandlerRegistrar.accept(this::sendNext); + return this; + } + + @Override + void communicateEndOfTransfer() { + sendEndOfFile(); + getResultFuture().complete(null); + } + + private void sendEndOfFile() { + sendChunk(new byte[0], -1); + } + + @Override + protected CallStreamObserver<Q> getRequestObserver() { + return requestObserver; + } +} diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java new file mode 100644 index 0000000000000000000000000000000000000000..de78cf14b005300683dd1d63ed5c7cf4184da010 --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/StreamingFileSender.java @@ -0,0 +1,200 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; + +import org.apache.commons.io.IOUtils; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +/* + * Q = Request Type; S = Response Type + */ +@Log4j2 +public abstract class StreamingFileSender<Q, S> { + + static final int CHUNK_SIZE = 4 * 1024; + + private final BiFunction<byte[], Integer, Q> chunkBuilder; + + @Getter + private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); + + private Q metaData; + private final AtomicBoolean metaDataSent = new AtomicBoolean(false); + private final AtomicBoolean done = new AtomicBoolean(false); + + @Getter(AccessLevel.PROTECTED) + private final StreamReader streamReader; + + StreamingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream) { + this.chunkBuilder = chunkBuilder; + this.streamReader = new StreamReader(inputStream); + } + + public StreamingFileSender<Q, S> withMetaData(@NonNull Q metaData) { + this.metaData = metaData; + return this; + } + + public abstract StreamingFileSender<Q, S> send(); + + public void cancelOnTimeout() { + LOG.warn("File transfer canceled on timeout"); + resultFuture.cancel(true); + closeStreamReader(); + } + + public void cancelOnError(Throwable t) { + LOG.error("File transfer canceled on error.", t); + resultFuture.cancel(true); + closeStreamReader(); + } + + void sendNext() { + if (notFinished()) { + waitForObserver(); + sendMetaData(); + do { + LOG.debug("Sending next chunk."); + sendNextChunk(); + } while (notFinished() && isReady()); + LOG.debug("Finished or waiting to become ready."); + } + checkIfFinishedForcefully(); + } + + private boolean notFinished() { + return !done.get() && !resultFuture.isCancelled(); + } + + private void checkIfFinishedForcefully() { + if (resultFuture.isCancelled()) { + LOG.warn("File transfer was cancelled"); + closeStreamReader(); + } + } + + private boolean isReady() { + return getRequestObserver().isReady(); + } + + private void waitForObserver() { + synchronized (this) { + while (Objects.isNull(getRequestObserver())) { + try { + LOG.debug("wait for observer"); + wait(300); + } catch (InterruptedException e) { + LOG.error("Error on waiting for request Observer.", e); + Thread.currentThread().interrupt(); + } + } + } + + } + + void sendNextChunk() { + byte[] contentToSend = streamReader.getNextData(); + + if (streamReader.getLastReadSize() > 0) { + sendChunk(contentToSend, streamReader.getLastReadSize()); + } else { + endTransfer(); + } + } + + void endTransfer() { + communicateEndOfTransfer(); + done.set(true); + LOG.debug("File Transfer done."); + closeStreamReader(); + } + + abstract void communicateEndOfTransfer(); + + void closeStreamReader() { + LOG.debug("Closing streams"); + streamReader.close(); + } + + void sendChunk(byte[] content, int length) { + LOG.debug("Sending {} byte Data.", length); + var chunk = chunkBuilder.apply(content, length); + getRequestObserver().onNext(chunk); + } + + void sendMetaData() { + if (metaData != null && !metaDataSent.get()) { + doSendMetaData(metaData); + } + } + + private void doSendMetaData(Q metadata) { + LOG.debug("Sending Metadata."); + getRequestObserver().onNext(metadata); + metaDataSent.set(true); + } + + protected abstract CallStreamObserver<Q> getRequestObserver(); + + @RequiredArgsConstructor + protected static class StreamReader { + private final InputStream inStream; + private final byte[] buffer = new byte[CHUNK_SIZE]; + @Getter + private int lastReadSize = 0; + @Getter + private final AtomicBoolean done = new AtomicBoolean(false); + + byte[] getNextData() { + readNext(); + return buffer; + } + + void close() { + IOUtils.closeQuietly(inStream); + } + + void readNext() { + try { + lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE); + } catch (IOException e) { + throw new TechnicalException("Error on reading a single chunk", e); + } + } + } +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java index fb77f3ca04c0554238b03cff1f996028d4ae29f6..c94b69ec5df2eec920e0797589706ace0f09e6cb 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java @@ -24,289 +24,64 @@ package de.ozgcloud.common.binaryfile; import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; -import org.apache.commons.lang3.RandomUtils; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InjectMocks; import org.mockito.Mock; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; -import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; -import de.ozgcloud.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; -import lombok.SneakyThrows; class GrpcFileUploadUtilsTest { - @InjectMocks - private GrpcFileUploadUtils service; - @Mock private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; @Mock + private InputStream inputStream; + @Mock private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder; @Mock private CallStreamObserver<TestRequestType> requestObserver; @Mock - private InputStream inputStream; - - private FileSender<TestRequestType, TestResponseType> fileSender; - - @Mock - private TestRequestType metaData; - - @BeforeEach - void init() { - when(reqObserverBuilder.apply(any())).thenReturn(requestObserver); - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder)); - } + private Consumer<Runnable> onReadyHandlerRegistrar; @Nested - class TestCreateFileSender { + class TestCreateSender { @Test - void shouldCreateRequestObserver() { - GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send(); + void shouldReturnInstanceOfStreamExclusiveFileSender() { + var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder); - verify(reqObserverBuilder, atLeastOnce()).apply(notNull()); + assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class); } } @Nested - class TestSendBinaryFile { - - @Captor - private ArgumentCaptor<Runnable> runnableCaptor; + class TestCreateStreamExclusiveFileSender { @Test - void shouldReturnSenderWithFuture() { - var result = fileSender.send(); + void shouldReturnInstanceOfStreamExclusiveFileSender() { + var createdSender = GrpcFileUploadUtils.createStreamExclusiveFileSender(chunkBuilder, inputStream, reqObserverBuilder); - assertThat(result).isNotNull().extracting(FileSender::getResultFuture).isNotNull(); + assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class); } } @Nested - class TestSendNext { - - @BeforeEach - void initObserver() { - fileSender.send(); - } + class TestCreateStreamSharingSender { @Test - void shouldCallSendMetaData() { - fileSender.sendNext(); - - verify(fileSender).sendMetaData(); - } - - @Test - void shouldSendNextChunk() { - fileSender.sendNext(); - - verify(fileSender).sendNextChunk(); - } - - } - - @Nested - class TestSendNextChunk { - - private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2); - private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content); + void shouldReturnInstanceOfStreamSharingSender() { + var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar); - @Captor - private ArgumentCaptor<byte[]> chunkCaptor; - - @Nested - class TestOnDataAvailable { - @BeforeEach - void initObserver() { - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder)); - fileSender.send(); - } - - @Test - void shouldCallSendChunk() { - fileSender.sendNextChunk(); - - verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); - assertThat(chunkCaptor.getValue()).contains(content); - } + assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class); } - - @Nested - class TestOnNoBytesLeftToRead { - - @Nested - class TestOnCompleteOnFileSent { - private static final boolean COMPLETE_ON_FILE_SENT = true; - - @BeforeEach - void initialize() { - var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; - byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); - fileSender.send(); - } - - @Test - void shouldCallOnCompleted() { - fileSender.sendNextChunk(); - - verify(requestObserver).onCompleted(); - } - - @Test - void shouldNotCallSendChunk() { - fileSender.sendNextChunk(); - - verify(fileSender, never()).sendChunk(any(), anyInt()); - } - - @Test - @SneakyThrows - void shouldCallCloseStreams() { - fileSender.sendNextChunk(); - - verify(fileSender).closeStreams(); - } - } - - @Nested - class TestOnNotCompleteOnFileSent { - private static final boolean COMPLETE_ON_FILE_SENT = false; - - @BeforeEach - void initialize() { - var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; - byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); - fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); - fileSender.send(); - } - - @Test - void shouldNotCallOnCompleted() { - fileSender.sendNextChunk(); - - verify(requestObserver, never()).onCompleted(); - } - - @Test - void shouldCallSendChunk() { - fileSender.sendNextChunk(); - - verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1)); - assertThat(chunkCaptor.getValue()).isEmpty(); - } - - @Test - @SneakyThrows - void shouldCallCloseStreams() { - fileSender.sendNextChunk(); - - verify(fileSender).closeStreams(); - } - } - } - } - - @Nested - class TestCloseStreams { - - @Test - @SneakyThrows - void shouldCloseInputStream() { - fileSender.send(); - - fileSender.closeStreams(); - - verify(inputStream).close(); - } - } - - @Nested - class TestSendChunk { - - private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes(); - - @BeforeEach - void initObserver() { - fileSender.send(); - } - - @Test - void shouldApplyBuildChunk() throws IOException { - fileSender.sendChunk(CHUNK_PART, 5); - - verify(chunkBuilder).apply(CHUNK_PART, 5); - } - - @Test - void shouldCallOnNext() throws IOException { - fileSender.sendChunk(CHUNK_PART, 5); - - verify(requestObserver).onNext(any()); - } - } - - @Nested - class TestSendMetaData { - - @BeforeEach - void initObserver() { - fileSender.send(); - } - - @Test - void shouldNotSendWithoutMetadata() { - fileSender.sendMetaData(); - - verify(requestObserver, never()).onNext(any()); - } - - @Test - void shouldSendMetadata() { - fileSender.withMetaData(metaData).sendMetaData(); - - verify(requestObserver).onNext(metaData); - } - - @Test - void shouldSendMetadataOnlyOnce() { - fileSender.withMetaData(metaData).sendMetaData(); - fileSender.sendMetaData(); - - verify(requestObserver).onNext(metaData); - } - } - - @Disabled("unused") - @Nested - class TestReadFromStream { - @Test - void shouldThrowException() throws IOException { - doThrow(IOException.class).when(inputStream).read(any(), anyInt(), anyInt()); - - assertThatThrownBy(() -> fileSender.readFromStream()).isInstanceOf(TechnicalException.class); - } - } - } \ No newline at end of file diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6ac54ef835b35e685a9b67b89fdba6fc5ed482a0 --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamExclusiveFileSenderTest.java @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.mockito.Mockito.*; + +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Spy; +import org.springframework.test.util.ReflectionTestUtils; + +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; + +class StreamExclusiveFileSenderTest { + + @Mock + private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; + @Mock + private InputStream inputStream; + @Mock + private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder; + @Spy + @InjectMocks + private StreamExclusiveFileSender<TestRequestType, TestResponseType> fileSender; + + @Nested + class TestSend { + + private final CompletableFuture<TestResponseType> resultFuture = CompletableFuture.completedFuture(new TestResponseType()); + @Captor + private ArgumentCaptor<Runnable> onReadyHandlerCaptor; + @Mock + private BinaryFileUploadStreamObserver<TestRequestType, TestResponseType> responseObserver; + + @BeforeEach + void init() { + doReturn(resultFuture).when(fileSender).getResultFuture(); + } + + @SuppressWarnings("rawtypes") + @Test + void shouldCreateResponseObserver() { + try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) { + doNothing().when(fileSender).sendNext(); + + fileSender.send(); + + mocked.verify(() -> BinaryFileUploadStreamObserver.create(same(resultFuture), onReadyHandlerCaptor.capture())); + verifyCallsSendNext(onReadyHandlerCaptor.getValue()); + } + } + + @SuppressWarnings("rawtypes") + @Test + void shouldBuildRequestObserver() { + try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) { + mocked.when(() -> BinaryFileUploadStreamObserver.create(any(), any())).thenReturn(responseObserver); + + fileSender.send(); + + verify(reqObserverBuilder).apply(responseObserver); + } + } + + private void verifyCallsSendNext(Runnable runnable) { + runnable.run(); + verify(fileSender).sendNext(); + } + } + + @Nested + class TestCommunicateEndOfTransfer { + + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + + @Test + void shouldCallOnCompleted() { + ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); + + fileSender.communicateEndOfTransfer(); + + verify(requestObserver).onCompleted(); + } + } + + @Nested + class TestCancelOnTimeout { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + + @BeforeEach + void init() { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnTimeout(); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCallOnError() { + fileSender.cancelOnTimeout(); + + verify(requestObserver).onError(any(TechnicalException.class)); + } + + @Test + void shouldCloseStreamReader() { + fileSender.cancelOnTimeout(); + + verify(fileSender).closeStreamReader(); + } + } + + @Nested + class TestCancelOnError { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + private final Throwable error = new Throwable(); + + @BeforeEach + void init() { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnError(error); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCallOnError() { + fileSender.cancelOnError(error); + + verify(requestObserver).onError(error); + } + + @Test + void shouldCloseStreamReader() { + fileSender.cancelOnError(error); + + verify(fileSender).closeStreamReader(); + } + } +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6c2225c9e742c6fa41e8390f386dc7f4bbad7bd7 --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamSharingFileSenderTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.InputStream; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; + +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; +import io.grpc.stub.CallStreamObserver; + +class StreamSharingFileSenderTest { + + @Mock + private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; + @Mock + private InputStream inputStream; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + @Mock + private Consumer<Runnable> onReadyHandlerRegistrar; + @Spy + @InjectMocks + private StreamSharingFileSender<TestRequestType, TestResponseType> fileSender; + + @Nested + class TestSend { + + @Captor + private ArgumentCaptor<Runnable> onReadyHandlerCaptor; + + @Test + void shouldRegisterOnReadyHandler() { + doNothing().when(fileSender).sendNext(); + + fileSender.send(); + + verify(onReadyHandlerRegistrar).accept(onReadyHandlerCaptor.capture()); + verifyCallsSendNext(onReadyHandlerCaptor.getValue()); + } + + @Test + void shouldReturnThis() { + var obj = fileSender.send(); + + assertThat(obj).isSameAs(fileSender); + } + + private void verifyCallsSendNext(Runnable runnable) { + runnable.run(); + verify(fileSender).sendNext(); + } + } + + @Nested + class TestCommunicateEndOfTransfer { + + @Test + void shouldSendEmptyChunk() { + fileSender.communicateEndOfTransfer(); + + verify(fileSender).sendChunk(new byte[0], -1); + } + + @Test + void shouldCompleteResultFuture() { + fileSender.communicateEndOfTransfer(); + + assertThat(fileSender.getResultFuture().isDone()).isTrue(); + } + } +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ce12c67d1ee3d157c1385d00f5bd5193382ac9ae --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/StreamingFileSenderTest.java @@ -0,0 +1,377 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; + +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.springframework.test.util.ReflectionTestUtils; + +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; +import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; +import io.grpc.stub.CallStreamObserver; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.SneakyThrows; + +class StreamingFileSenderTest { + + @Mock + private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; + @Mock + private InputStream inputStream; + @Mock + private CallStreamObserver<TestRequestType> requestObserver; + + private TestFileSender fileSender; + + @BeforeEach + void init() { + fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver)); + } + + @Nested + class TestCancelOnTimeout { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + + @BeforeEach + void init() { + setResultFutureInFileSender(resultFuture); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnTimeout(); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCloseStreams() { + fileSender.cancelOnTimeout(); + + verify(fileSender).closeStreamReader(); + } + } + + @Nested + class TestCancelOnError { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + private final Throwable error = new Throwable(); + + @BeforeEach + void init() { + setResultFutureInFileSender(resultFuture); + } + + @Test + void shouldCancelResultFuture() { + fileSender.cancelOnError(error); + + verify(resultFuture).cancel(true); + } + + @Test + void shouldCloseStreams() { + fileSender.cancelOnError(error); + + verify(fileSender).closeStreamReader(); + } + } + + @Nested + class TestSendNext { + + @Mock + private CompletableFuture<TestResponseType> resultFuture; + + @BeforeEach + void init() { + fileSender.send(); + } + + @Test + void shouldSendMetaData() { + fileSender.sendNext(); + + verify(fileSender).sendMetaData(); + } + + @Test + void shouldSendNextChunk() { + fileSender.sendNext(); + + verify(fileSender).sendNextChunk(); + } + + @Test + void shouldNotSendMetaDataIfDone() { + setDoneInFileSender(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldNotSendMetaDataIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender, never()).sendMetaData(); + } + + @Test + void shouldSendNextChunkUntilDone() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + setDoneInFileSender(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void shouldSendNextChunkUntilCancelled() { + lenient().when(requestObserver.isReady()).thenReturn(true); + doAnswer(invocation -> { + fileSender.getResultFuture().cancel(true); + return null; + }).when(fileSender).sendNextChunk(); + + fileSender.sendNext(); + + verify(fileSender, times(1)).sendNextChunk(); + } + + @Test + void closeStreamReaderIfCancelled() { + fileSender.getResultFuture().cancel(true); + + fileSender.sendNext(); + + verify(fileSender).closeStreamReader(); + } + } + + @Nested + class TestSendNextChunk { + + private final byte[] content = RandomUtils.insecure().randomBytes(StreamingFileSender.CHUNK_SIZE / 2); + private final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content); + + @Captor + private ArgumentCaptor<byte[]> chunkCaptor; + + @Nested + class TestOnDataAvailable { + + @BeforeEach + void init() { + fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver)); + } + + @Test + void shouldSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); + assertThat(chunkCaptor.getValue()).contains(content); + } + } + + @Nested + class TestOnNoBytesLeftToRead { + + @BeforeEach + void init() { + var buffer = new byte[StreamingFileSender.CHUNK_SIZE]; + byteArrayInputStream.read(buffer, 0, StreamingFileSender.CHUNK_SIZE); + fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver)); + } + + @Test + void shouldNotSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender, never()).sendChunk(any(), anyInt()); + } + + @Test + void shouldEndTransfer() { + fileSender.sendNextChunk(); + + verify(fileSender).endTransfer(); + } + } + } + + @Nested + class TestEndTransfer { + + @Test + void shouldCommunicateEndOfTransfer() { + fileSender.endTransfer(); + + verify(fileSender).communicateEndOfTransfer(); + } + + @Test + void shouldSetDoneToTrue() { + fileSender.endTransfer(); + + fileSender.sendNext(); + verify(fileSender, never()).sendNextChunk(); + } + + @Test + void shouldCloseStreamReader() { + fileSender.endTransfer(); + + verify(fileSender).closeStreamReader(); + } + } + + @Nested + class TestCloseStreamReader { + + @Test + @SneakyThrows + void shouldCloseInputStream() { + fileSender.closeStreamReader(); + + verify(inputStream).close(); + } + } + + @Nested + class TestSendChunk { + + private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes(); + + @BeforeEach + void initObserver() { + fileSender.send(); + } + + @Test + void shouldApplyBuildChunk() { + fileSender.sendChunk(CHUNK_PART, 5); + + verify(chunkBuilder).apply(CHUNK_PART, 5); + } + + @Test + void shouldCallOnNext() { + fileSender.sendChunk(CHUNK_PART, 5); + + verify(requestObserver).onNext(any()); + } + } + + @Nested + class TestSendMetaData { + + @Mock + private TestRequestType metaData; + + @BeforeEach + void initObserver() { + fileSender.send(); + } + + @Test + void shouldNotSendWithoutMetadata() { + fileSender.sendMetaData(); + + verify(requestObserver, never()).onNext(any()); + } + + @Test + void shouldSendMetadata() { + fileSender.withMetaData(metaData).sendMetaData(); + + verify(requestObserver).onNext(metaData); + } + + @Test + void shouldSendMetadataOnlyOnce() { + fileSender.withMetaData(metaData).sendMetaData(); + + fileSender.sendMetaData(); + + verify(requestObserver).onNext(metaData); + } + } + + private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) { + ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture); + } + + private void setDoneInFileSender(boolean done) { + ((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done); + } + + static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> { + + @Getter(AccessLevel.PROTECTED) + private final CallStreamObserver<TestRequestType> requestObserver; + + TestFileSender(BiFunction<byte[], Integer, TestRequestType> chunkBuilder, InputStream inputStream, + CallStreamObserver<TestRequestType> requestObserver) { + super(chunkBuilder, inputStream); + this.requestObserver = requestObserver; + } + + @Override + public StreamingFileSender<TestRequestType, TestResponseType> send() { + return this; + } + + @Override + void communicateEndOfTransfer() { + } + } +}