diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 07379d8833ea981f6d9515d562e0f457d2709743..9a41471b91fb20bc65131a501291f4d55867016e 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2; @Log4j2 public class GrpcBinaryFileServerDownloader<T> { - private static final int CHUNK_SIZE = 255 * 1024; + static final int CHUNK_SIZE = 255 * 1024; + + static final int END_OF_STREAM = -1; + static final int NOTHING_READ = 0; private final CallStreamObserver<T> callObserver; private final Function<ByteString, T> chunkBuilder; @@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> { private final byte[] buffer = new byte[CHUNK_SIZE]; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean downloadFinished = new AtomicBoolean(false); private final AtomicBoolean requestFinished = new AtomicBoolean(false); private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>(); @@ -118,56 +120,61 @@ public class GrpcBinaryFileServerDownloader<T> { LOG.debug("Downloading file content..."); downloadConsumer.accept(outputStream); LOG.debug("Download completed."); - downloadFinished.set(true); } synchronized void sendChunks() { try { doSendChunks(); } catch (Exception e) { - completeRequestWithError(new TechnicalException("Error while sending chunks", e)); + handleError(new TechnicalException("Error while sending chunks", e)); } } void doSendChunks() throws IOException { - if (requestFinished.get()) { - return; + while (canSendChunks()) { + processDataFromInputStream(); } - int bytesRead; - while (isReady()) { - if ((bytesRead = inputStream.read(buffer)) == -1) { - tryCompleteRequest(); + } + + boolean canSendChunks() { + return !requestFinished.get() && callObserver.isReady(); + } + + void processDataFromInputStream() throws IOException { + var bytesRead = inputStream.read(buffer); + switch (bytesRead) { + case END_OF_STREAM: + finishProcessing(); break; - } - callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); - LOG.debug("Sent {} bytes", bytesRead); + case NOTHING_READ: + break; + default: + sendBytesToCallObserver(bytesRead); } } - private boolean isReady() { - return callObserver.isReady(); + void sendBytesToCallObserver(int bytesRead) { + var bytes = ByteString.copyFrom(buffer, 0, bytesRead); + var chunk = chunkBuilder.apply(bytes); + callObserver.onNext(chunk); + LOG.debug("Sent {} bytes", bytesRead); } - void tryCompleteRequest() { + void finishProcessing() { if (Objects.nonNull(downloadError.get())) { throw downloadError.get(); - } else if (downloadFinished.get()) { - completeRequestNormally(); + } else { + finishRequest(); + callObserver.onCompleted(); } } - void completeRequestWithError(TechnicalException e) { + void handleError(TechnicalException e) { LOG.debug("Complete download request with error"); finishRequest(); throw e; } - void completeRequestNormally() { - LOG.debug("Complete download request"); - finishRequest(); - callObserver.onCompleted(); - } - private void finishRequest() { requestFinished.set(true); closeInputStream(); diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..fed3cc387ce54f90b1d4dec72c515cc2556f7ee5 --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.OutputStream; +import java.util.Arrays; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.springframework.core.task.SimpleAsyncTaskExecutor; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import lombok.SneakyThrows; + +class GrpcBinaryFileServerDownloaderITCase { + + @Mock + private CallStreamObserver<GrpcResponseDummy> callObserver; + private SimpleAsyncTaskExecutor taskExecutor; + private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader; + + @BeforeEach + void init() { + taskExecutor = new SimpleAsyncTaskExecutor(); + when(callObserver.isReady()).thenReturn(true); + } + + @AfterEach + void cleanup() { + taskExecutor.close(); + } + + @Nested + class OnNoError { + + private static final int DOWNLOAD_DATA_LENGTH = (int) (GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5); + @Captor + private ArgumentCaptor<GrpcResponseDummy> captor; + + @BeforeEach + void init() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadData) + .build()); + } + + @SneakyThrows + @Test + void shouldCallOnCompleted() { + start(); + + verify(callObserver).onCompleted(); + } + + @Test + void shouldReadAllData() { + start(); + + verify(callObserver, times(2)).onNext(captor.capture()); + var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum(); + assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH); + } + + @Test + void shouldCloseStreams() { + start(); + + verify(downloader).closeInputStream(); + verify(downloader).closeOutputStream(); + } + + @Test + void shouldCompleteIfDownloadConsumerClosedOutputStream() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadDataAndCloseStream) + .build()); + + start(); + + verify(callObserver).onCompleted(); + } + + @SneakyThrows + private void downloadData(OutputStream outputStream) { + byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH]; + Arrays.fill(bytes, (byte) 1); + outputStream.write(bytes); + } + + @SneakyThrows + private void downloadDataAndCloseStream(OutputStream outputStream) { + downloadData(outputStream); + outputStream.close(); + Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns + } + } + + @Nested + class OnError { + + private final Throwable error = new TechnicalException("error"); + + @BeforeEach + void init() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadData) + .build()); + } + + @Test + void shouldThrowException() { + assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class); + } + + @Test + void shouldNotCallOnCompleted() { + catchException(GrpcBinaryFileServerDownloaderITCase.this::start); + + verify(callObserver, never()).onCompleted(); + } + + @Test + void shouldCloseStreams() { + catchException(GrpcBinaryFileServerDownloaderITCase.this::start); + + verify(downloader).closeInputStream(); + verify(downloader).closeOutputStream(); + } + + @SneakyThrows + private void downloadData(OutputStream outputStream) { + throw error; + } + } + + private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() { + return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder() + .taskExecutor(taskExecutor) + .callObserver(callObserver) + .chunkBuilder(this::buildChunk); + } + + private GrpcResponseDummy buildChunk(ByteString data) { + return new GrpcResponseDummy(data.size()); + } + + private void start() { + downloader.start(); + downloader.sendChunks(); + } + + private record GrpcResponseDummy(int bytesRead) {} +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java index f2f2ef48a7818513f9dce804058f25352fa171d5..1817466f0245e6a85caf5bc7c5244277c965c119 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -23,6 +23,7 @@ */ package de.ozgcloud.common.binaryfile; +import static de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader.*; import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -37,11 +38,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; @@ -300,18 +304,6 @@ class GrpcBinaryFileServerDownloaderTest { verify(downloadConsumer).accept(outputStream); } - - @Test - void shouldDownloadFinishedBeInitiallyFalse() { - assertThat(getDownloadFinished()).isFalse(); - } - - @Test - void shouldSetDownloadFinished() { - downloader.doDownload(); - - assertThat(getDownloadFinished()).isTrue(); - } } @Nested @@ -346,14 +338,14 @@ class GrpcBinaryFileServerDownloaderTest { @BeforeEach void init() { doThrow(exception).when(downloader).doSendChunks(); - doNothing().when(downloader).completeRequestWithError(any()); + doNothing().when(downloader).handleError(any()); } @Test - void shouldCompleteRequestWithError() { + void shouldHandleError() { downloader.sendChunks(); - verify(downloader).completeRequestWithError(argumentCaptor.capture()); + verify(downloader).handleError(argumentCaptor.capture()); assertThat(argumentCaptor.getValue()).isInstanceOf(TechnicalException.class).hasCause(exception); } } @@ -362,103 +354,174 @@ class GrpcBinaryFileServerDownloaderTest { @Nested class TestDoSendChunks { + @SneakyThrows + @Test + void shouldCheckIfCanSendChunks() { + doReturn(false).when(downloader).canSendChunks(); + + downloader.doSendChunks(); + + verify(downloader).canSendChunks(); + } + + @SneakyThrows + @Test + void shouldNotProcessIfCannotSendChunks() { + doReturn(false).when(downloader).canSendChunks(); + + downloader.doSendChunks(); + + verify(downloader, never()).processDataFromInputStream(); + } + + @SneakyThrows + @Test + void shouldProcessAsLongAsCanSendChunks() { + doReturn(true, true, false).when(downloader).canSendChunks(); + doNothing().when(downloader).processDataFromInputStream(); + + downloader.doSendChunks(); + + verify(downloader, times(2)).processDataFromInputStream(); + } + } + + @Nested + class TestProcessDataFromInputStream { + + @Mock + private PipedInputStream inputStream; + @Nested - class OnRequestFinished { + class OnEndOfStreamReached { + @SneakyThrows @BeforeEach void init() { - setRequestFinishedField(true); + doNothing().when(downloader).finishProcessing(); + when(inputStream.read(any())).thenReturn(END_OF_STREAM); + setInputStreamField(inputStream); } + @SneakyThrows @Test - void shouldNotInteractWithCallObserver() { - doSendChunks(); + void shouldFinishProcessing() { + downloader.processDataFromInputStream(); - verifyNoInteractions(callObserver); + verify(downloader).finishProcessing(); + } + + @SneakyThrows + @Test + void shouldNotSendBytesToCallObserver() { + downloader.processDataFromInputStream(); + + verify(downloader, never()).sendBytesToCallObserver(anyInt()); } } @Nested - class OnRequestNotFinished { + class OnNoBytesWereReceived { - @Nested - class OnNotReady { + @SneakyThrows + @BeforeEach + void init() { + when(inputStream.read(any())).thenReturn(NOTHING_READ); + setInputStreamField(inputStream); + } + + @SneakyThrows + @Test + void shouldNotSendBytesToCallObserver() { + downloader.processDataFromInputStream(); + + verify(downloader, never()).sendBytesToCallObserver(anyInt()); + } + } + + @Nested + class OnBytesWereReceived { + + private final int bytesRead = 20; - @BeforeEach - void init() { - when(callObserver.isReady()).thenReturn(false); - } + @SneakyThrows + @BeforeEach + void mock() { + when(inputStream.read(any())).thenReturn(bytesRead); + setInputStreamField(inputStream); + } - @Test - void shouldOnlyCallIsReadyOnObserver() { - doSendChunks(); + @SneakyThrows + @Test + void shouldSendBytesToCallObserver() { + downloader.processDataFromInputStream(); - verify(callObserver).isReady(); - verifyNoMoreInteractions(callObserver); - } + verify(downloader).sendBytesToCallObserver(bytesRead); } + } + } - @Nested - class OnReady { + @Nested + class TestSendBytesToCallObserver { - @Mock - private PipedInputStream inputStream; - @Captor - private ArgumentCaptor<ByteString> byteStringCaptor; + @Captor + private ArgumentCaptor<ByteString> byteStringCaptor; + private final int bytesRead = 20; + private final byte[] buffer = new byte[bytesRead]; + private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); - private final int readBytes = 20; - private final byte[] buffer = new byte[readBytes]; - private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); + @BeforeEach + void init() { + new Random().nextBytes(buffer); + ReflectionTestUtils.setField(downloader, "buffer", buffer); + } - @SneakyThrows - @BeforeEach - void mock() { - doNothing().when(downloader).tryCompleteRequest(); - when(callObserver.isReady()).thenReturn(true); - when(inputStream.read(any())).thenReturn(readBytes, -1); - setInputStreamField(inputStream); - new Random().nextBytes(buffer); - ReflectionTestUtils.setField(downloader, "buffer", buffer); - } + @SneakyThrows + @Test + void shouldCallChunkBuilder() { + downloader.sendBytesToCallObserver(bytesRead); - @Test - void shouldCallChunkBuilder() { - doSendChunks(); + verify(chunkBuilder).apply(byteStringCaptor.capture()); + assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); + } - verify(chunkBuilder).apply(byteStringCaptor.capture()); - assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); - } + @SneakyThrows + @Test + void shouldCallOnNext() { + when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); - @DisplayName("should send next chunk if callObserver is ready and stream already received data") - @Test - void shouldCallOnNext() { - when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); + downloader.sendBytesToCallObserver(bytesRead); - doSendChunks(); + verify(callObserver).onNext(grpcResponseDummy); + } + } - verify(callObserver).onNext(grpcResponseDummy); - } + @Nested + class TestCanSendChunks { - @DisplayName("should call complete grpc stream if download has finished and stream has no data left") - @Test - void shouldTryCompleteRequest() { - setDownloadFinishedField(true); + @ParameterizedTest + @MethodSource("provideArguments") + void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) { + setRequestFinishedField(requestFinished); + lenient().when(callObserver.isReady()).thenReturn(ready); - doSendChunks(); + var canSendChunks = downloader.canSendChunks(); - verify(downloader).tryCompleteRequest(); - } - } + assertThat(canSendChunks).isEqualTo(expected); } - @SneakyThrows - private void doSendChunks() { - downloader.doSendChunks(); + private static Stream<Arguments> provideArguments() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(false, true, true), + Arguments.of(true, false, false), + Arguments.of(true, true, false) + ); } } @Nested - class TestTryCompleteRequest { + class TestFinishProcessing { @Nested class OnError { @@ -472,92 +535,52 @@ class GrpcBinaryFileServerDownloaderTest { @Test void shouldThrowException() { - assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception); + assertThatThrownBy(downloader::finishProcessing).isSameAs(exception); } } @Nested - class OnDownloadFinished { + class OnNoError { @BeforeEach void init() { - setDownloadFinishedField(true); - doNothing().when(downloader).completeRequestNormally(); + doNothing().when(downloader).closeInputStream(); } @Test void shouldNotCompleteRequestWithError() { - downloader.tryCompleteRequest(); + downloader.finishProcessing(); - verify(downloader, never()).completeRequestWithError(any()); + verify(downloader, never()).handleError(any()); } @Test - void shouldCompleteRequestNormally() { - downloader.tryCompleteRequest(); + void shouldSetRequestFinished() { + assertThat(getRequestFinished()).isFalse(); - verify(downloader).completeRequestNormally(); - } - } + downloader.finishProcessing(); - @Nested - class OnDownloadNotFinished { - - @BeforeEach - void init() { - setDownloadFinishedField(false); + assertThat(getRequestFinished()).isTrue(); } @Test - void shouldNotCompleteRequestNormally() { - downloader.tryCompleteRequest(); + void shouldCloseInputStream() { + downloader.finishProcessing(); - verify(downloader, never()).completeRequestNormally(); + verify(downloader).closeInputStream(); } @Test - void shouldNotCompleteRequestWithError() { - downloader.tryCompleteRequest(); + void shouldNotifyObserver() { + downloader.finishProcessing(); - verify(downloader, never()).completeRequestWithError(any()); + verify(callObserver).onCompleted(); } } } @Nested - class TestCompleteRequestNormally { - - @BeforeEach - void init() { - doNothing().when(downloader).closeInputStream(); - } - - @Test - void shouldSetRequestFinished() { - assertThat(getRequestFinished()).isFalse(); - - downloader.completeRequestNormally(); - - assertThat(getRequestFinished()).isTrue(); - } - - @Test - void shouldCloseInputStream() { - downloader.completeRequestNormally(); - - verify(downloader).closeInputStream(); - } - - @Test - void shouldNotifyObserver() { - downloader.completeRequestNormally(); - - verify(callObserver).onCompleted(); - } - } - - @Nested - class TestCompleteRequestWithError { + class TestHandleError { private final TechnicalException error = new TechnicalException("error"); @@ -570,21 +593,21 @@ class GrpcBinaryFileServerDownloaderTest { void shouldSetRequestFinished() { assertThat(getRequestFinished()).isFalse(); - catchException(() -> downloader.completeRequestWithError(error)); + catchException(() -> downloader.handleError(error)); assertThat(getRequestFinished()).isTrue(); } @Test void shouldCloseInputStream() { - catchException(() -> downloader.completeRequestWithError(error)); + catchException(() -> downloader.handleError(error)); verify(downloader).closeInputStream(); } @Test void shouldThrowException() { - assertThatThrownBy(() -> downloader.completeRequestWithError(error)).isSameAs(error); + assertThatThrownBy(() -> downloader.handleError(error)).isSameAs(error); } } @@ -612,14 +635,6 @@ class GrpcBinaryFileServerDownloaderTest { return (TechnicalException) ReflectionTestUtils.getField(downloader, "downloadError", AtomicReference.class).get(); } - private void setDownloadFinishedField(boolean downloadFinished) { - ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished)); - } - - private boolean getDownloadFinished() { - return ReflectionTestUtils.getField(downloader, "downloadFinished", AtomicBoolean.class).get(); - } - private static class GrpcResponseDummy { } } \ No newline at end of file