From 133aa19b29b5464ade61fe480a63de94e050665f Mon Sep 17 00:00:00 2001 From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com> Date: Fri, 7 Feb 2025 15:40:39 +0100 Subject: [PATCH 1/3] OZG-7262 OZG-7680 Fix onReadyHandler not finishing --- .../GrpcBinaryFileServerDownloader.java | 42 +-- .../GrpcBinaryFileServerDownloaderITCase.java | 185 +++++++++++ .../GrpcBinaryFileServerDownloaderTest.java | 287 +++++++++++------- 3 files changed, 393 insertions(+), 121 deletions(-) create mode 100644 ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 07379d8..7326002 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2; @Log4j2 public class GrpcBinaryFileServerDownloader<T> { - private static final int CHUNK_SIZE = 255 * 1024; + static final int CHUNK_SIZE = 255 * 1024; + + private static final int END_OF_STREAM = -1; + private static final int NOTHING_READ = 0; private final CallStreamObserver<T> callObserver; private final Function<ByteString, T> chunkBuilder; @@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> { private final byte[] buffer = new byte[CHUNK_SIZE]; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean downloadFinished = new AtomicBoolean(false); private final AtomicBoolean requestFinished = new AtomicBoolean(false); private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>(); @@ -118,7 +120,6 @@ public class GrpcBinaryFileServerDownloader<T> { LOG.debug("Downloading file content..."); downloadConsumer.accept(outputStream); LOG.debug("Download completed."); - downloadFinished.set(true); } synchronized void sendChunks() { @@ -130,28 +131,33 @@ public class GrpcBinaryFileServerDownloader<T> { } void doSendChunks() throws IOException { - if (requestFinished.get()) { - return; - } - int bytesRead; - while (isReady()) { - if ((bytesRead = inputStream.read(buffer)) == -1) { - tryCompleteRequest(); - break; - } - callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); - LOG.debug("Sent {} bytes", bytesRead); + while (canSendChunks()) { + processDataFromInputStream(); } } - private boolean isReady() { - return callObserver.isReady(); + boolean canSendChunks() { + return !requestFinished.get() && callObserver.isReady(); + } + + void processDataFromInputStream() throws IOException { + var bytesRead = inputStream.read(buffer); + switch (bytesRead) { + case END_OF_STREAM: + completeRequest(); + break; + case NOTHING_READ: + break; + default: + callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); + LOG.debug("Sent {} bytes", bytesRead); + } } - void tryCompleteRequest() { + void completeRequest() { if (Objects.nonNull(downloadError.get())) { throw downloadError.get(); - } else if (downloadFinished.get()) { + } else { completeRequestNormally(); } } diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java new file mode 100644 index 0000000..039f873 --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.io.OutputStream; +import java.util.Arrays; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.springframework.core.task.SimpleAsyncTaskExecutor; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import lombok.SneakyThrows; + +class GrpcBinaryFileServerDownloaderITCase { + + @Mock + private CallStreamObserver<GrpcResponseDummy> callObserver; + private SimpleAsyncTaskExecutor taskExecutor; + private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader; + + @BeforeEach + void init() { + taskExecutor = new SimpleAsyncTaskExecutor(); + when(callObserver.isReady()).thenReturn(true); + } + + @AfterEach + void cleanup() { + taskExecutor.close(); + } + + @Nested + class OnNoError { + + private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue(); + @Captor + private ArgumentCaptor<GrpcResponseDummy> captor; + + @BeforeEach + void init() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadData) + .build()); + } + + @SneakyThrows + @Test + void shouldCallOnCompleted() { + start(); + + verify(callObserver).onCompleted(); + } + + @Test + void shouldReadAllData() { + start(); + + verify(callObserver, times(2)).onNext(captor.capture()); + var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum(); + assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH); + } + + @Test + void shouldCloseStreams() { + start(); + + verify(downloader).closeInputStream(); + verify(downloader).closeOutputStream(); + } + + @Test + void shouldCompleteIfDownloadConsumerClosedOutputStream() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadDataAndCloseStream) + .build()); + + start(); + + verify(callObserver).onCompleted(); + } + + @SneakyThrows + private void downloadData(OutputStream outputStream) { + byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH]; + Arrays.fill(bytes, (byte) 1); + outputStream.write(bytes); + } + + @SneakyThrows + private void downloadDataAndCloseStream(OutputStream outputStream) { + downloadData(outputStream); + outputStream.close(); + Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns + } + } + + @Nested + class OnError { + + private final Throwable error = new TechnicalException("error"); + + @BeforeEach + void init() { + downloader = spy(downloaderBuilder() + .downloadConsumer(this::downloadData) + .build()); + } + + @Test + void shouldThrowException() { + assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class); + } + + @Test + void shouldNotCallOnCompleted() { + catchException(GrpcBinaryFileServerDownloaderITCase.this::start); + + verify(callObserver, never()).onCompleted(); + } + + @Test + void shouldCloseStreams() { + catchException(GrpcBinaryFileServerDownloaderITCase.this::start); + + verify(downloader).closeInputStream(); + verify(downloader).closeOutputStream(); + } + + @SneakyThrows + private void downloadData(OutputStream outputStream) { + throw error; + } + } + + private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() { + return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder() + .taskExecutor(taskExecutor) + .callObserver(callObserver) + .chunkBuilder(this::buildChunk); + } + + private GrpcResponseDummy buildChunk(ByteString data) { + return new GrpcResponseDummy(data.size()); + } + + private void start() { + downloader.start(); + downloader.sendChunks(); + } + + private record GrpcResponseDummy(int bytesRead) {} +} diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java index f2f2ef4..e2f860e 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,11 +38,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; @@ -300,18 +305,6 @@ class GrpcBinaryFileServerDownloaderTest { verify(downloadConsumer).accept(outputStream); } - - @Test - void shouldDownloadFinishedBeInitiallyFalse() { - assertThat(getDownloadFinished()).isFalse(); - } - - @Test - void shouldSetDownloadFinished() { - downloader.doDownload(); - - assertThat(getDownloadFinished()).isTrue(); - } } @Nested @@ -362,103 +355,157 @@ class GrpcBinaryFileServerDownloaderTest { @Nested class TestDoSendChunks { + @SneakyThrows + @Test + void shouldCheckIfCanSendChunks() { + doReturn(false).when(downloader).canSendChunks(); + + downloader.doSendChunks(); + + verify(downloader).canSendChunks(); + } + + @SneakyThrows + @Test + void shouldNotProcessIfCannotSendChunks() { + doReturn(false).when(downloader).canSendChunks(); + + downloader.doSendChunks(); + + verify(downloader, never()).processDataFromInputStream(); + } + + @SneakyThrows + @Test + void shouldProcessAsLongAsCanSendChunks() { + doReturn(true, true, false).when(downloader).canSendChunks(); + doNothing().when(downloader).processDataFromInputStream(); + + downloader.doSendChunks(); + + verify(downloader, times(2)).processDataFromInputStream(); + } + } + + @Nested + class TestProcessDataFromInputStream { + + @Mock + private PipedInputStream inputStream; + @Nested - class OnRequestFinished { + class OnEndOfStreamReached { + @SneakyThrows @BeforeEach void init() { - setRequestFinishedField(true); + doNothing().when(downloader).completeRequest(); + when(inputStream.read(any())).thenReturn(-1); + setInputStreamField(inputStream); } + @SneakyThrows @Test - void shouldNotInteractWithCallObserver() { - doSendChunks(); + void shouldCompleteRequest() { + downloader.processDataFromInputStream(); + + verify(downloader).completeRequest(); + } + + @SneakyThrows + @Test + void shouldNotCallCallObserver() { + downloader.processDataFromInputStream(); verifyNoInteractions(callObserver); } } @Nested - class OnRequestNotFinished { + class OnNoBytesWereReceived { - @Nested - class OnNotReady { - - @BeforeEach - void init() { - when(callObserver.isReady()).thenReturn(false); - } + @SneakyThrows + @BeforeEach + void init() { + when(inputStream.read(any())).thenReturn(0); + setInputStreamField(inputStream); + } - @Test - void shouldOnlyCallIsReadyOnObserver() { - doSendChunks(); + @SneakyThrows + @Test + void shouldNotCallCallObserver() { + downloader.processDataFromInputStream(); - verify(callObserver).isReady(); - verifyNoMoreInteractions(callObserver); - } + verifyNoInteractions(callObserver); } + } - @Nested - class OnReady { + @Nested + class OnBytesWereReceived { - @Mock - private PipedInputStream inputStream; - @Captor - private ArgumentCaptor<ByteString> byteStringCaptor; + @Captor + private ArgumentCaptor<ByteString> byteStringCaptor; - private final int readBytes = 20; - private final byte[] buffer = new byte[readBytes]; - private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); + private final int readBytes = 20; + private final byte[] buffer = new byte[readBytes]; + private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); - @SneakyThrows - @BeforeEach - void mock() { - doNothing().when(downloader).tryCompleteRequest(); - when(callObserver.isReady()).thenReturn(true); - when(inputStream.read(any())).thenReturn(readBytes, -1); - setInputStreamField(inputStream); - new Random().nextBytes(buffer); - ReflectionTestUtils.setField(downloader, "buffer", buffer); - } + @SneakyThrows + @BeforeEach + void mock() { + when(inputStream.read(any())).thenReturn(readBytes); + setInputStreamField(inputStream); + new Random().nextBytes(buffer); + ReflectionTestUtils.setField(downloader, "buffer", buffer); + } - @Test - void shouldCallChunkBuilder() { - doSendChunks(); + @SneakyThrows + @Test + void shouldCallChunkBuilder() { + downloader.processDataFromInputStream(); - verify(chunkBuilder).apply(byteStringCaptor.capture()); - assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); - } + verify(chunkBuilder).apply(byteStringCaptor.capture()); + assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); + } - @DisplayName("should send next chunk if callObserver is ready and stream already received data") - @Test - void shouldCallOnNext() { - when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); + @SneakyThrows + @Test + void shouldCallOnNext() { + when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); - doSendChunks(); + downloader.processDataFromInputStream(); - verify(callObserver).onNext(grpcResponseDummy); - } + verify(callObserver).onNext(grpcResponseDummy); + } + } + } - @DisplayName("should call complete grpc stream if download has finished and stream has no data left") - @Test - void shouldTryCompleteRequest() { - setDownloadFinishedField(true); + @Nested + class TestCanSendChunks { - doSendChunks(); + @ParameterizedTest + @MethodSource("provideArguments") + void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) { + ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished)); + lenient().when(callObserver.isReady()).thenReturn(ready); - verify(downloader).tryCompleteRequest(); - } - } + var canSendChunks = downloader.canSendChunks(); + + assertThat(canSendChunks).isEqualTo(expected); } - @SneakyThrows - private void doSendChunks() { - downloader.doSendChunks(); + private static Stream<Arguments> provideArguments() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(false, true, true), + Arguments.of(true, false, false), + Arguments.of(true, true, false) + ); } } @Nested - class TestTryCompleteRequest { + class TestCompleteRequest { @Nested class OnError { @@ -472,56 +519,32 @@ class GrpcBinaryFileServerDownloaderTest { @Test void shouldThrowException() { - assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception); + assertThatThrownBy(downloader::completeRequest).isSameAs(exception); } } @Nested - class OnDownloadFinished { + class OnNoError { @BeforeEach void init() { - setDownloadFinishedField(true); doNothing().when(downloader).completeRequestNormally(); } @Test void shouldNotCompleteRequestWithError() { - downloader.tryCompleteRequest(); + downloader.completeRequest(); verify(downloader, never()).completeRequestWithError(any()); } @Test void shouldCompleteRequestNormally() { - downloader.tryCompleteRequest(); + downloader.completeRequest(); verify(downloader).completeRequestNormally(); } } - - @Nested - class OnDownloadNotFinished { - - @BeforeEach - void init() { - setDownloadFinishedField(false); - } - - @Test - void shouldNotCompleteRequestNormally() { - downloader.tryCompleteRequest(); - - verify(downloader, never()).completeRequestNormally(); - } - - @Test - void shouldNotCompleteRequestWithError() { - downloader.tryCompleteRequest(); - - verify(downloader, never()).completeRequestWithError(any()); - } - } } @Nested @@ -622,4 +645,62 @@ class GrpcBinaryFileServerDownloaderTest { private static class GrpcResponseDummy { } + + @Nested + class TestStreams { + + private static final int CHUNK_SIZE = 255 * 1024; + + private PipedInputStream inputStream; + private PipedOutputStream outputStream; + + @SneakyThrows + @BeforeEach + void init() { + outputStream = new PipedOutputStream(); + inputStream = new PipedInputStream(CHUNK_SIZE); + outputStream.connect(inputStream); + } + + @SneakyThrows + @Test + void shouldReadIncompleteFile() { + var fileBuffer = new byte[CHUNK_SIZE]; + var readBuffer = new byte[CHUNK_SIZE]; + try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) { + fileInputStream.read(fileBuffer, 0, 255); + outputStream.write(fileBuffer, 0, 1); + + var read = inputStream.read(readBuffer, 0, CHUNK_SIZE); + + assertThat(read).isEqualTo(1); + } + } + + @SneakyThrows + @Test + void shouldReadAfterOutputStreamWasClosed() { + var fileBuffer = new byte[CHUNK_SIZE]; + var readBuffer = new byte[CHUNK_SIZE * 2]; + try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) { + fileInputStream.read(fileBuffer, 0, fileBuffer.length); + outputStream.write(fileBuffer); + outputStream.close(); + + var read = inputStream.read(readBuffer); + + assertThat(read).isEqualTo(CHUNK_SIZE); + + read = inputStream.read(readBuffer); + assertThat(read).isEqualTo(-1); + } + } + + @SneakyThrows + @AfterEach + void cleanup() { + outputStream.close(); + inputStream.close(); + } + } } \ No newline at end of file -- GitLab From cf0635063f18e2d6a73924291df2709a7fb1e4ad Mon Sep 17 00:00:00 2001 From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com> Date: Fri, 7 Feb 2025 15:49:37 +0100 Subject: [PATCH 2/3] OZG-7262 OZG-7680 Remove unneeded code --- .../GrpcBinaryFileServerDownloaderTest.java | 70 +------------------ 1 file changed, 1 insertion(+), 69 deletions(-) diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java index e2f860e..ba9b67a 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -40,7 +39,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -486,7 +484,7 @@ class GrpcBinaryFileServerDownloaderTest { @ParameterizedTest @MethodSource("provideArguments") void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) { - ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished)); + setRequestFinishedField(requestFinished); lenient().when(callObserver.isReady()).thenReturn(ready); var canSendChunks = downloader.canSendChunks(); @@ -635,72 +633,6 @@ class GrpcBinaryFileServerDownloaderTest { return (TechnicalException) ReflectionTestUtils.getField(downloader, "downloadError", AtomicReference.class).get(); } - private void setDownloadFinishedField(boolean downloadFinished) { - ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished)); - } - - private boolean getDownloadFinished() { - return ReflectionTestUtils.getField(downloader, "downloadFinished", AtomicBoolean.class).get(); - } - private static class GrpcResponseDummy { } - - @Nested - class TestStreams { - - private static final int CHUNK_SIZE = 255 * 1024; - - private PipedInputStream inputStream; - private PipedOutputStream outputStream; - - @SneakyThrows - @BeforeEach - void init() { - outputStream = new PipedOutputStream(); - inputStream = new PipedInputStream(CHUNK_SIZE); - outputStream.connect(inputStream); - } - - @SneakyThrows - @Test - void shouldReadIncompleteFile() { - var fileBuffer = new byte[CHUNK_SIZE]; - var readBuffer = new byte[CHUNK_SIZE]; - try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) { - fileInputStream.read(fileBuffer, 0, 255); - outputStream.write(fileBuffer, 0, 1); - - var read = inputStream.read(readBuffer, 0, CHUNK_SIZE); - - assertThat(read).isEqualTo(1); - } - } - - @SneakyThrows - @Test - void shouldReadAfterOutputStreamWasClosed() { - var fileBuffer = new byte[CHUNK_SIZE]; - var readBuffer = new byte[CHUNK_SIZE * 2]; - try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) { - fileInputStream.read(fileBuffer, 0, fileBuffer.length); - outputStream.write(fileBuffer); - outputStream.close(); - - var read = inputStream.read(readBuffer); - - assertThat(read).isEqualTo(CHUNK_SIZE); - - read = inputStream.read(readBuffer); - assertThat(read).isEqualTo(-1); - } - } - - @SneakyThrows - @AfterEach - void cleanup() { - outputStream.close(); - inputStream.close(); - } - } } \ No newline at end of file -- GitLab From c10e6482acea52f680e3db96fd9b554e0266db4a Mon Sep 17 00:00:00 2001 From: Krzysztof Witukiewicz <krzysztof.witukiewicz@mgm-tp.com> Date: Mon, 10 Feb 2025 10:37:28 +0100 Subject: [PATCH 3/3] OZG-7262 OZG-7680 Renaming methods --- .../GrpcBinaryFileServerDownloader.java | 31 ++-- .../GrpcBinaryFileServerDownloaderITCase.java | 2 +- .../GrpcBinaryFileServerDownloaderTest.java | 144 +++++++++--------- 3 files changed, 90 insertions(+), 87 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 7326002..9a41471 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -48,8 +48,8 @@ public class GrpcBinaryFileServerDownloader<T> { static final int CHUNK_SIZE = 255 * 1024; - private static final int END_OF_STREAM = -1; - private static final int NOTHING_READ = 0; + static final int END_OF_STREAM = -1; + static final int NOTHING_READ = 0; private final CallStreamObserver<T> callObserver; private final Function<ByteString, T> chunkBuilder; @@ -126,7 +126,7 @@ public class GrpcBinaryFileServerDownloader<T> { try { doSendChunks(); } catch (Exception e) { - completeRequestWithError(new TechnicalException("Error while sending chunks", e)); + handleError(new TechnicalException("Error while sending chunks", e)); } } @@ -144,36 +144,37 @@ public class GrpcBinaryFileServerDownloader<T> { var bytesRead = inputStream.read(buffer); switch (bytesRead) { case END_OF_STREAM: - completeRequest(); + finishProcessing(); break; case NOTHING_READ: break; default: - callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); - LOG.debug("Sent {} bytes", bytesRead); + sendBytesToCallObserver(bytesRead); } } - void completeRequest() { + void sendBytesToCallObserver(int bytesRead) { + var bytes = ByteString.copyFrom(buffer, 0, bytesRead); + var chunk = chunkBuilder.apply(bytes); + callObserver.onNext(chunk); + LOG.debug("Sent {} bytes", bytesRead); + } + + void finishProcessing() { if (Objects.nonNull(downloadError.get())) { throw downloadError.get(); } else { - completeRequestNormally(); + finishRequest(); + callObserver.onCompleted(); } } - void completeRequestWithError(TechnicalException e) { + void handleError(TechnicalException e) { LOG.debug("Complete download request with error"); finishRequest(); throw e; } - void completeRequestNormally() { - LOG.debug("Complete download request"); - finishRequest(); - callObserver.onCompleted(); - } - private void finishRequest() { requestFinished.set(true); closeInputStream(); diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java index 039f873..fed3cc3 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderITCase.java @@ -65,7 +65,7 @@ class GrpcBinaryFileServerDownloaderITCase { @Nested class OnNoError { - private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue(); + private static final int DOWNLOAD_DATA_LENGTH = (int) (GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5); @Captor private ArgumentCaptor<GrpcResponseDummy> captor; diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java index ba9b67a..1817466 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -23,6 +23,7 @@ */ package de.ozgcloud.common.binaryfile; +import static de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader.*; import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -337,14 +338,14 @@ class GrpcBinaryFileServerDownloaderTest { @BeforeEach void init() { doThrow(exception).when(downloader).doSendChunks(); - doNothing().when(downloader).completeRequestWithError(any()); + doNothing().when(downloader).handleError(any()); } @Test - void shouldCompleteRequestWithError() { + void shouldHandleError() { downloader.sendChunks(); - verify(downloader).completeRequestWithError(argumentCaptor.capture()); + verify(downloader).handleError(argumentCaptor.capture()); assertThat(argumentCaptor.getValue()).isInstanceOf(TechnicalException.class).hasCause(exception); } } @@ -397,25 +398,25 @@ class GrpcBinaryFileServerDownloaderTest { @SneakyThrows @BeforeEach void init() { - doNothing().when(downloader).completeRequest(); - when(inputStream.read(any())).thenReturn(-1); + doNothing().when(downloader).finishProcessing(); + when(inputStream.read(any())).thenReturn(END_OF_STREAM); setInputStreamField(inputStream); } @SneakyThrows @Test - void shouldCompleteRequest() { + void shouldFinishProcessing() { downloader.processDataFromInputStream(); - verify(downloader).completeRequest(); + verify(downloader).finishProcessing(); } @SneakyThrows @Test - void shouldNotCallCallObserver() { + void shouldNotSendBytesToCallObserver() { downloader.processDataFromInputStream(); - verifyNoInteractions(callObserver); + verify(downloader, never()).sendBytesToCallObserver(anyInt()); } } @@ -425,56 +426,73 @@ class GrpcBinaryFileServerDownloaderTest { @SneakyThrows @BeforeEach void init() { - when(inputStream.read(any())).thenReturn(0); + when(inputStream.read(any())).thenReturn(NOTHING_READ); setInputStreamField(inputStream); } @SneakyThrows @Test - void shouldNotCallCallObserver() { + void shouldNotSendBytesToCallObserver() { downloader.processDataFromInputStream(); - verifyNoInteractions(callObserver); + verify(downloader, never()).sendBytesToCallObserver(anyInt()); } } @Nested class OnBytesWereReceived { - @Captor - private ArgumentCaptor<ByteString> byteStringCaptor; - - private final int readBytes = 20; - private final byte[] buffer = new byte[readBytes]; - private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); + private final int bytesRead = 20; @SneakyThrows @BeforeEach void mock() { - when(inputStream.read(any())).thenReturn(readBytes); + when(inputStream.read(any())).thenReturn(bytesRead); setInputStreamField(inputStream); - new Random().nextBytes(buffer); - ReflectionTestUtils.setField(downloader, "buffer", buffer); } @SneakyThrows @Test - void shouldCallChunkBuilder() { + void shouldSendBytesToCallObserver() { downloader.processDataFromInputStream(); - verify(chunkBuilder).apply(byteStringCaptor.capture()); - assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); + verify(downloader).sendBytesToCallObserver(bytesRead); } + } + } - @SneakyThrows - @Test - void shouldCallOnNext() { - when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); + @Nested + class TestSendBytesToCallObserver { - downloader.processDataFromInputStream(); + @Captor + private ArgumentCaptor<ByteString> byteStringCaptor; + private final int bytesRead = 20; + private final byte[] buffer = new byte[bytesRead]; + private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); - verify(callObserver).onNext(grpcResponseDummy); - } + @BeforeEach + void init() { + new Random().nextBytes(buffer); + ReflectionTestUtils.setField(downloader, "buffer", buffer); + } + + @SneakyThrows + @Test + void shouldCallChunkBuilder() { + downloader.sendBytesToCallObserver(bytesRead); + + verify(chunkBuilder).apply(byteStringCaptor.capture()); + assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); + } + + @SneakyThrows + @Test + void shouldCallOnNext() { + when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); + + downloader.sendBytesToCallObserver(bytesRead); + + verify(callObserver).onNext(grpcResponseDummy); } } @@ -503,7 +521,7 @@ class GrpcBinaryFileServerDownloaderTest { } @Nested - class TestCompleteRequest { + class TestFinishProcessing { @Nested class OnError { @@ -517,7 +535,7 @@ class GrpcBinaryFileServerDownloaderTest { @Test void shouldThrowException() { - assertThatThrownBy(downloader::completeRequest).isSameAs(exception); + assertThatThrownBy(downloader::finishProcessing).isSameAs(exception); } } @@ -526,59 +544,43 @@ class GrpcBinaryFileServerDownloaderTest { @BeforeEach void init() { - doNothing().when(downloader).completeRequestNormally(); + doNothing().when(downloader).closeInputStream(); } @Test void shouldNotCompleteRequestWithError() { - downloader.completeRequest(); + downloader.finishProcessing(); - verify(downloader, never()).completeRequestWithError(any()); + verify(downloader, never()).handleError(any()); } @Test - void shouldCompleteRequestNormally() { - downloader.completeRequest(); + void shouldSetRequestFinished() { + assertThat(getRequestFinished()).isFalse(); - verify(downloader).completeRequestNormally(); - } - } - } + downloader.finishProcessing(); - @Nested - class TestCompleteRequestNormally { - - @BeforeEach - void init() { - doNothing().when(downloader).closeInputStream(); - } - - @Test - void shouldSetRequestFinished() { - assertThat(getRequestFinished()).isFalse(); - - downloader.completeRequestNormally(); - - assertThat(getRequestFinished()).isTrue(); - } + assertThat(getRequestFinished()).isTrue(); + } - @Test - void shouldCloseInputStream() { - downloader.completeRequestNormally(); + @Test + void shouldCloseInputStream() { + downloader.finishProcessing(); - verify(downloader).closeInputStream(); - } + verify(downloader).closeInputStream(); + } - @Test - void shouldNotifyObserver() { - downloader.completeRequestNormally(); + @Test + void shouldNotifyObserver() { + downloader.finishProcessing(); - verify(callObserver).onCompleted(); + verify(callObserver).onCompleted(); + } } } @Nested - class TestCompleteRequestWithError { + class TestHandleError { private final TechnicalException error = new TechnicalException("error"); @@ -591,21 +593,21 @@ class GrpcBinaryFileServerDownloaderTest { void shouldSetRequestFinished() { assertThat(getRequestFinished()).isFalse(); - catchException(() -> downloader.completeRequestWithError(error)); + catchException(() -> downloader.handleError(error)); assertThat(getRequestFinished()).isTrue(); } @Test void shouldCloseInputStream() { - catchException(() -> downloader.completeRequestWithError(error)); + catchException(() -> downloader.handleError(error)); verify(downloader).closeInputStream(); } @Test void shouldThrowException() { - assertThatThrownBy(() -> downloader.completeRequestWithError(error)).isSameAs(error); + assertThatThrownBy(() -> downloader.handleError(error)).isSameAs(error); } } -- GitLab