diff --git a/.gitignore b/.gitignore index c10edf4e5214be3a14fbe0fa4512f95f87fd6662..bc1ffde805dc5f657ff8f25a61ccdbe8ca86009f 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target .idea *.iml +.vscode/settings.json diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java index e7aae2efb9b16f1150e897a6a5b2414f22e81b41..1cbd50d5e2368639f27448c8f61ba7a47740ecd8 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java @@ -55,7 +55,12 @@ public class GrpcFileUploadUtils { */ public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { - return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream); + return createSender(chunkBuilder, inputStream, reqObserverBuilder, true); + } + + public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, + Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) { + return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent); } public static class FileSender<Q, S> { @@ -72,12 +77,19 @@ public class GrpcFileUploadUtils { private final AtomicBoolean done = new AtomicBoolean(false); private final StreamReader streamReader; + private final boolean completeOnFileSent; FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, InputStream inputStream) { + this(chunkBuilder, reqObserverBuilder, inputStream, true); + } + + FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, + InputStream inputStream, boolean completeOnFileSent) { this.chunkBuilder = chunkBuilder; this.inputStream = inputStream; this.reqObserverBuilder = reqObserverBuilder; + this.completeOnFileSent = completeOnFileSent; this.streamReader = new StreamReader(this.inputStream); } @@ -140,7 +152,7 @@ public class GrpcFileUploadUtils { } - long sendNextChunk() { + void sendNextChunk() { byte[] contentToSend = streamReader.getNextData(); if (streamReader.getLastReadSize() > 0) { @@ -148,20 +160,25 @@ public class GrpcFileUploadUtils { } else { endTransfer(); } - return contentToSend.length; } private void endTransfer() { - requestObserver.onCompleted(); + if (completeOnFileSent) + requestObserver.onCompleted(); + else + sendEndOfFile(); done.set(true); LOG.debug("File Transfer done."); closeStreams(); } - private void closeStreams() { + private void sendEndOfFile() { + sendChunk(new byte[0], streamReader.getLastReadSize()); + } + + void closeStreams() { LOG.debug("Closing streams"); - IOUtils.closeQuietly(inputStream); streamReader.close(); } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java index 83e18f8a06855426be7aeb13e6ecdf3282bb7553..fb77f3ca04c0554238b03cff1f996028d4ae29f6 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtilsTest.java @@ -27,11 +27,13 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.function.BiFunction; import java.util.function.Function; +import org.apache.commons.lang3.RandomUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; @@ -47,6 +49,7 @@ import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender; import de.ozgcloud.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; +import lombok.SneakyThrows; class GrpcFileUploadUtilsTest { @@ -122,6 +125,123 @@ class GrpcFileUploadUtilsTest { } + @Nested + class TestSendNextChunk { + + private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2); + private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content); + + @Captor + private ArgumentCaptor<byte[]> chunkCaptor; + + @Nested + class TestOnDataAvailable { + @BeforeEach + void initObserver() { + fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder)); + fileSender.send(); + } + + @Test + void shouldCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length)); + assertThat(chunkCaptor.getValue()).contains(content); + } + } + + @Nested + class TestOnNoBytesLeftToRead { + + @Nested + class TestOnCompleteOnFileSent { + private static final boolean COMPLETE_ON_FILE_SENT = true; + + @BeforeEach + void initialize() { + var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; + byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); + fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); + fileSender.send(); + } + + @Test + void shouldCallOnCompleted() { + fileSender.sendNextChunk(); + + verify(requestObserver).onCompleted(); + } + + @Test + void shouldNotCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender, never()).sendChunk(any(), anyInt()); + } + + @Test + @SneakyThrows + void shouldCallCloseStreams() { + fileSender.sendNextChunk(); + + verify(fileSender).closeStreams(); + } + } + + @Nested + class TestOnNotCompleteOnFileSent { + private static final boolean COMPLETE_ON_FILE_SENT = false; + + @BeforeEach + void initialize() { + var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE]; + byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE); + fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT)); + fileSender.send(); + } + + @Test + void shouldNotCallOnCompleted() { + fileSender.sendNextChunk(); + + verify(requestObserver, never()).onCompleted(); + } + + @Test + void shouldCallSendChunk() { + fileSender.sendNextChunk(); + + verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1)); + assertThat(chunkCaptor.getValue()).isEmpty(); + } + + @Test + @SneakyThrows + void shouldCallCloseStreams() { + fileSender.sendNextChunk(); + + verify(fileSender).closeStreams(); + } + } + } + + } + + @Nested + class TestCloseStreams { + + @Test + @SneakyThrows + void shouldCloseInputStream() { + fileSender.send(); + + fileSender.closeStreams(); + + verify(inputStream).close(); + } + } + @Nested class TestSendChunk {