diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 07379d8833ea981f6d9515d562e0f457d2709743..732600269b23577374e983b60b9b04fb3a649223 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2; @Log4j2 public class GrpcBinaryFileServerDownloader<T> { - private static final int CHUNK_SIZE = 255 * 1024; + static final int CHUNK_SIZE = 255 * 1024; + + private static final int END_OF_STREAM = -1; + private static final int NOTHING_READ = 0; private final CallStreamObserver<T> callObserver; private final Function<ByteString, T> chunkBuilder; @@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> { private final byte[] buffer = new byte[CHUNK_SIZE]; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean downloadFinished = new AtomicBoolean(false); private final AtomicBoolean requestFinished = new AtomicBoolean(false); private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>(); @@ -118,7 +120,6 @@ public class GrpcBinaryFileServerDownloader<T> { LOG.debug("Downloading file content..."); downloadConsumer.accept(outputStream); LOG.debug("Download completed."); - downloadFinished.set(true); } synchronized void sendChunks() { @@ -130,28 +131,33 @@ public class GrpcBinaryFileServerDownloader<T> { } void doSendChunks() throws IOException { - if (requestFinished.get()) { - return; - } - int bytesRead; - while (isReady()) { - if ((bytesRead = inputStream.read(buffer)) == -1) { - tryCompleteRequest(); - break; - } - callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); - LOG.debug("Sent {} bytes", bytesRead); + while (canSendChunks()) { + processDataFromInputStream(); } } - private boolean isReady() { - return callObserver.isReady(); + boolean canSendChunks() { + return !requestFinished.get() && callObserver.isReady(); + } + + void processDataFromInputStream() throws IOException { + var bytesRead = inputStream.read(buffer); + switch (bytesRead) { + case END_OF_STREAM: + completeRequest(); + break; + case NOTHING_READ: + break; + default: + callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); + LOG.debug("Sent {} bytes", bytesRead); + } } - void tryCompleteRequest() { + void completeRequest() { if (Objects.nonNull(downloadError.get())) { throw downloadError.get(); - } else if (downloadFinished.get()) { + } else { completeRequestNormally(); } } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..039f873a97eee187224a497865b55efdc69e7d3b --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.OutputStream; +import java.util.Arrays; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.springframework.core.task.SimpleAsyncTaskExecutor; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import lombok.SneakyThrows; + +class GrpcBinaryFileServerDownloaderITCase { + + @Mock + private CallStreamObserver<GrpcResponseDummy> callObserver; + private SimpleAsyncTaskExecutor taskExecutor; + private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader; + + @BeforeEach + void init() { + taskExecutor = new SimpleAsyncTaskExecutor(); + when(callObserver.isReady()).thenReturn(true); + } + + @AfterEach + void cleanup() { + taskExecutor.close(); + } + + @Nested + class OnNoError { + + private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue(); + @Captor + private ArgumentCaptor<GrpcResponseDummy> captor; + + @BeforeEach + void init() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadData) + .build()); + } + + @SneakyThrows + @Test + void shouldCallOnCompleted() { + start(); + + verify(callObserver).onCompleted(); + } + + @Test + void shouldReadAllData() { + start(); + + verify(callObserver, times(2)).onNext(captor.capture()); + var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum(); + assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH); + } + + @Test + void shouldCloseStreams() { + start(); + + verify(downloader).closeInputStream(); + verify(downloader).closeOutputStream(); + } + + @Test + void shouldCompleteIfDownloadConsumerClosedOutputStream() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadDataAndCloseStream) + .build()); + + start(); + + verify(callObserver).onCompleted(); + } + + @SneakyThrows + private void downloadData(OutputStream outputStream) { + byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH]; + Arrays.fill(bytes, (byte) 1); + outputStream.write(bytes); + } + + @SneakyThrows + private void downloadDataAndCloseStream(OutputStream outputStream) { + downloadData(outputStream); + outputStream.close(); + Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns + } + } + + @Nested + class OnError { + + private final Throwable error = new TechnicalException("error"); + + @BeforeEach + void init() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadData) + .build()); + } + + @Test + void shouldThrowException() { + assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class); + } + + @Test + void shouldNotCallOnCompleted() { + catchException(GrpcBinaryFileServerDownloaderITCase.this::start); + + verify(callObserver, never()).onCompleted(); + } + + @Test + void shouldCloseStreams() { + catchException(GrpcBinaryFileServerDownloaderITCase.this::start); + + verify(downloader).closeInputStream(); + verify(downloader).closeOutputStream(); + } + + @SneakyThrows + private void downloadData(OutputStream outputStream) { + throw error; + } + } + + private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() { + return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder() + .taskExecutor(taskExecutor) + .callObserver(callObserver) + .chunkBuilder(this::buildChunk); + } + + private GrpcResponseDummy buildChunk(ByteString data) { + return new GrpcResponseDummy(data.size()); + } + + private void start() { + downloader.start(); + downloader.sendChunks(); + } + + private record GrpcResponseDummy(int bytesRead) {} +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java index f2f2ef48a7818513f9dce804058f25352fa171d5..e2f860ec13170e5bca5718f34f73f1b57105b142 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,11 +38,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; @@ -300,18 +305,6 @@ class GrpcBinaryFileServerDownloaderTest { verify(downloadConsumer).accept(outputStream); } - - @Test - void shouldDownloadFinishedBeInitiallyFalse() { - assertThat(getDownloadFinished()).isFalse(); - } - - @Test - void shouldSetDownloadFinished() { - downloader.doDownload(); - - assertThat(getDownloadFinished()).isTrue(); - } } @Nested @@ -362,103 +355,157 @@ class GrpcBinaryFileServerDownloaderTest { @Nested class TestDoSendChunks { + @SneakyThrows + @Test + void shouldCheckIfCanSendChunks() { + doReturn(false).when(downloader).canSendChunks(); + + downloader.doSendChunks(); + + verify(downloader).canSendChunks(); + } + + @SneakyThrows + @Test + void shouldNotProcessIfCannotSendChunks() { + doReturn(false).when(downloader).canSendChunks(); + + downloader.doSendChunks(); + + verify(downloader, never()).processDataFromInputStream(); + } + + @SneakyThrows + @Test + void shouldProcessAsLongAsCanSendChunks() { + doReturn(true, true, false).when(downloader).canSendChunks(); + doNothing().when(downloader).processDataFromInputStream(); + + downloader.doSendChunks(); + + verify(downloader, times(2)).processDataFromInputStream(); + } + } + + @Nested + class TestProcessDataFromInputStream { + + @Mock + private PipedInputStream inputStream; + @Nested - class OnRequestFinished { + class OnEndOfStreamReached { + @SneakyThrows @BeforeEach void init() { - setRequestFinishedField(true); + doNothing().when(downloader).completeRequest(); + when(inputStream.read(any())).thenReturn(-1); + setInputStreamField(inputStream); } + @SneakyThrows @Test - void shouldNotInteractWithCallObserver() { - doSendChunks(); + void shouldCompleteRequest() { + downloader.processDataFromInputStream(); + + verify(downloader).completeRequest(); + } + + @SneakyThrows + @Test + void shouldNotCallCallObserver() { + downloader.processDataFromInputStream(); verifyNoInteractions(callObserver); } } @Nested - class OnRequestNotFinished { + class OnNoBytesWereReceived { - @Nested - class OnNotReady { - - @BeforeEach - void init() { - when(callObserver.isReady()).thenReturn(false); - } + @SneakyThrows + @BeforeEach + void init() { + when(inputStream.read(any())).thenReturn(0); + setInputStreamField(inputStream); + } - @Test - void shouldOnlyCallIsReadyOnObserver() { - doSendChunks(); + @SneakyThrows + @Test + void shouldNotCallCallObserver() { + downloader.processDataFromInputStream(); - verify(callObserver).isReady(); - verifyNoMoreInteractions(callObserver); - } + verifyNoInteractions(callObserver); } + } - @Nested - class OnReady { + @Nested + class OnBytesWereReceived { - @Mock - private PipedInputStream inputStream; - @Captor - private ArgumentCaptor<ByteString> byteStringCaptor; + @Captor + private ArgumentCaptor<ByteString> byteStringCaptor; - private final int readBytes = 20; - private final byte[] buffer = new byte[readBytes]; - private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); + private final int readBytes = 20; + private final byte[] buffer = new byte[readBytes]; + private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); - @SneakyThrows - @BeforeEach - void mock() { - doNothing().when(downloader).tryCompleteRequest(); - when(callObserver.isReady()).thenReturn(true); - when(inputStream.read(any())).thenReturn(readBytes, -1); - setInputStreamField(inputStream); - new Random().nextBytes(buffer); - ReflectionTestUtils.setField(downloader, "buffer", buffer); - } + @SneakyThrows + @BeforeEach + void mock() { + when(inputStream.read(any())).thenReturn(readBytes); + setInputStreamField(inputStream); + new Random().nextBytes(buffer); + ReflectionTestUtils.setField(downloader, "buffer", buffer); + } - @Test - void shouldCallChunkBuilder() { - doSendChunks(); + @SneakyThrows + @Test + void shouldCallChunkBuilder() { + downloader.processDataFromInputStream(); - verify(chunkBuilder).apply(byteStringCaptor.capture()); - assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); - } + verify(chunkBuilder).apply(byteStringCaptor.capture()); + assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); + } - @DisplayName("should send next chunk if callObserver is ready and stream already received data") - @Test - void shouldCallOnNext() { - when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); + @SneakyThrows + @Test + void shouldCallOnNext() { + when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); - doSendChunks(); + downloader.processDataFromInputStream(); - verify(callObserver).onNext(grpcResponseDummy); - } + verify(callObserver).onNext(grpcResponseDummy); + } + } + } - @DisplayName("should call complete grpc stream if download has finished and stream has no data left") - @Test - void shouldTryCompleteRequest() { - setDownloadFinishedField(true); + @Nested + class TestCanSendChunks { - doSendChunks(); + @ParameterizedTest + @MethodSource("provideArguments") + void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) { + ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished)); + lenient().when(callObserver.isReady()).thenReturn(ready); - verify(downloader).tryCompleteRequest(); - } - } + var canSendChunks = downloader.canSendChunks(); + + assertThat(canSendChunks).isEqualTo(expected); } - @SneakyThrows - private void doSendChunks() { - downloader.doSendChunks(); + private static Stream<Arguments> provideArguments() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(false, true, true), + Arguments.of(true, false, false), + Arguments.of(true, true, false) + ); } } @Nested - class TestTryCompleteRequest { + class TestCompleteRequest { @Nested class OnError { @@ -472,56 +519,32 @@ class GrpcBinaryFileServerDownloaderTest { @Test void shouldThrowException() { - assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception); + assertThatThrownBy(downloader::completeRequest).isSameAs(exception); } } @Nested - class OnDownloadFinished { + class OnNoError { @BeforeEach void init() { - setDownloadFinishedField(true); doNothing().when(downloader).completeRequestNormally(); } @Test void shouldNotCompleteRequestWithError() { - downloader.tryCompleteRequest(); + downloader.completeRequest(); verify(downloader, never()).completeRequestWithError(any()); } @Test void shouldCompleteRequestNormally() { - downloader.tryCompleteRequest(); + downloader.completeRequest(); verify(downloader).completeRequestNormally(); } } - - @Nested - class OnDownloadNotFinished { - - @BeforeEach - void init() { - setDownloadFinishedField(false); - } - - @Test - void shouldNotCompleteRequestNormally() { - downloader.tryCompleteRequest(); - - verify(downloader, never()).completeRequestNormally(); - } - - @Test - void shouldNotCompleteRequestWithError() { - downloader.tryCompleteRequest(); - - verify(downloader, never()).completeRequestWithError(any()); - } - } } @Nested @@ -622,4 +645,62 @@ class GrpcBinaryFileServerDownloaderTest { private static class GrpcResponseDummy { } + + @Nested + class TestStreams { + + private static final int CHUNK_SIZE = 255 * 1024; + + private PipedInputStream inputStream; + private PipedOutputStream outputStream; + + @SneakyThrows + @BeforeEach + void init() { + outputStream = new PipedOutputStream(); + inputStream = new PipedInputStream(CHUNK_SIZE); + outputStream.connect(inputStream); + } + + @SneakyThrows + @Test + void shouldReadIncompleteFile() { + var fileBuffer = new byte[CHUNK_SIZE]; + var readBuffer = new byte[CHUNK_SIZE]; + try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) { + fileInputStream.read(fileBuffer, 0, 255); + outputStream.write(fileBuffer, 0, 1); + + var read = inputStream.read(readBuffer, 0, CHUNK_SIZE); + + assertThat(read).isEqualTo(1); + } + } + + @SneakyThrows + @Test + void shouldReadAfterOutputStreamWasClosed() { + var fileBuffer = new byte[CHUNK_SIZE]; + var readBuffer = new byte[CHUNK_SIZE * 2]; + try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) { + fileInputStream.read(fileBuffer, 0, fileBuffer.length); + outputStream.write(fileBuffer); + outputStream.close(); + + var read = inputStream.read(readBuffer); + + assertThat(read).isEqualTo(CHUNK_SIZE); + + read = inputStream.read(readBuffer); + assertThat(read).isEqualTo(-1); + } + } + + @SneakyThrows + @AfterEach + void cleanup() { + outputStream.close(); + inputStream.close(); + } + } } \ No newline at end of file