From d8ee4094488b2b10379c28d9261afa24e4d6f323 Mon Sep 17 00:00:00 2001 From: OZGCloud <ozgcloud@mgm-tp.com> Date: Tue, 19 Nov 2024 18:33:13 +0100 Subject: [PATCH] OZG-7143 fix download completion issue --- .../GrpcBinaryFileServerDownloader.java | 69 ++++++---- .../GrpcBinaryFileServerDownloaderTest.java | 125 ++++++++++++------ 2 files changed, 130 insertions(+), 64 deletions(-) diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java index 9056318..e19f14e 100644 --- a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -52,7 +52,8 @@ public class GrpcBinaryFileServerDownloader<T> { private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE]; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean downloadInProgress = new AtomicBoolean(false); + private final AtomicBoolean downloadFinished = new AtomicBoolean(false); + private final AtomicBoolean sendingFinished = new AtomicBoolean(false); private PipedInputStream inputStream; private PipedOutputStream outputStream; @@ -77,10 +78,10 @@ public class GrpcBinaryFileServerDownloader<T> { } void doStart() { - LOG.debug("[{}] Starting download.", Thread.currentThread().getName()); + LOG.debug("Starting download."); handleSafety(this::setupStreams); taskExecutor.execute(Context.current().wrap(this::startDownload)); - callObserver.setOnReadyHandler(this::onReadyHandler); + callObserver.setOnReadyHandler(this::sendChunks); } void setupStreams() throws IOException { @@ -93,46 +94,62 @@ public class GrpcBinaryFileServerDownloader<T> { handleSafety(this::doDownload); } - void doDownload() throws IOException { - downloadInProgress.set(true); - LOG.debug("[{}] Downloading file content.", Thread.currentThread().getName()); + void doDownload() { + LOG.debug("Downloading file content..."); downloadConsumer.accept(outputStream); - LOG.debug("[{}] Downloading file content finished.", Thread.currentThread().getName()); - downloadInProgress.set(false); - outputStream.close(); + LOG.debug("Download completed."); + downloadFinished.set(true); + closeOutputStream(); + sendChunks(); } - synchronized void onReadyHandler() { - if (callObserver.isReady()) { - sendChunks(); - } - } - - void sendChunks() { + synchronized void sendChunks() { handleSafety(this::doSendChunks); } void doSendChunks() throws IOException { - int bytesRead = 0; - while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { + if (sendingFinished.get()) { + return; + } + int bytesRead; + while (callObserver.isReady()) { + if ((bytesRead = inputStream.read(buffer)) == -1) { + if (downloadFinished.get()) { + completeDownload(); + } + break; + } callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); + LOG.debug("Sent {} bytes", bytesRead); } - LOG.debug("[{}] Sending file content finished. isReady: {}, bytesRead: {}", Thread.currentThread().getName(), callObserver.isReady(), - bytesRead); - if (!downloadInProgress.get()) { - inputStream.close(); - LOG.debug("[{}] Complete request.", Thread.currentThread().getName()); - callObserver.onCompleted(); + } + + void completeDownload() { + LOG.debug("try to complete download"); + if (sendingFinished.getAndSet(true)) { + return; } + closeInputStream(); + LOG.debug("Complete request."); + callObserver.onCompleted(); } void handleSafety(ExceptionalRunnable runnable) { try { runnable.run(); } catch (Exception e) { - IOUtils.closeQuietly(inputStream, e1 -> LOG.error("InputStream cannot be closed.", e1)); - IOUtils.closeQuietly(outputStream, e1 -> LOG.error("OutputStream cannot be closed.", e1)); + closeOutputStream(); + closeInputStream(); throw new TechnicalException("Error occurred during downloading file content download.", e); } } + + void closeOutputStream() { + IOUtils.closeQuietly(outputStream, e -> LOG.error("OutputStream cannot be closed.", e)); + } + + void closeInputStream() { + IOUtils.closeQuietly(inputStream, e -> LOG.error("InputStream cannot be closed.", e)); + } + } \ No newline at end of file diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java index ed67be0..9e42671 100644 --- a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -42,6 +42,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -184,9 +185,6 @@ class GrpcBinaryFileServerDownloaderTest { @Nested class TestStartDownload { - @Mock - private Context callContext; - @Captor private ArgumentCaptor<ExceptionalRunnable> runnableCaptor; @@ -233,21 +231,6 @@ class GrpcBinaryFileServerDownloaderTest { } } - @DisplayName("On ready handler") - @Nested - class TestOnReadyHandler { - - @Test - void shouldSendChunksIfCallObserverIsReady() { - when(callObserver.isReady()).thenReturn(true); - doNothing().when(downloader).sendChunks(); - - downloader.onReadyHandler(); - - verify(downloader).sendChunks(); - } - } - @DisplayName("Send chunks") @Nested class TestSendChunks { @@ -264,7 +247,7 @@ class GrpcBinaryFileServerDownloaderTest { @SneakyThrows @Test - void shouldCallDoDownoad() { + void shouldCallDoDownload() { doNothing().when(downloader).doSendChunks(); downloader.sendChunks(); @@ -278,49 +261,101 @@ class GrpcBinaryFileServerDownloaderTest { @Mock private PipedInputStream inputStream; + @Captor + private ArgumentCaptor<ByteString> byteStringCaptor; - private final int data = 20; + private final int readBytes = 20; + private final byte[] buffer = new byte[readBytes]; + private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy(); + @SneakyThrows @BeforeEach void mock() { + when(callObserver.isReady()).thenReturn(true); + when(inputStream.read(any())).thenReturn(readBytes, -1); setInputStreamField(inputStream); + new Random().nextBytes(buffer); + setBufferArray(buffer); + } + + @Test + void shouldCallChunkBuilder() { + doSendChunks(); + + verify(chunkBuilder).apply(byteStringCaptor.capture()); + assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer); } - @SneakyThrows @DisplayName("should send next chunk if callObserver is ready and stream already received data") @Test void shouldCallOnNext() { - when(callObserver.isReady()).thenReturn(true); - when(inputStream.read(any())).thenReturn(data, -1); + when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy); - downloader.doSendChunks(); + doSendChunks(); - verify(callObserver).onNext(any()); + verify(callObserver).onNext(grpcResponseDummy); } - @SneakyThrows @DisplayName("should call complete grpc stream if download has finished and stream has no data left") @Test - void shouldCallOnCompleted() { - setDownloadInProgressField(new AtomicBoolean(false)); + void shouldCallCompleteDownload() { + setDownloadFinishedField(new AtomicBoolean(true)); - downloader.doSendChunks(); + doSendChunks(); - verify(callObserver).onCompleted(); + verify(downloader).completeDownload(); } @SneakyThrows - @Test - void shouldCloseInputStream() { - setDownloadInProgressField(new AtomicBoolean(false)); - + private void doSendChunks() { downloader.doSendChunks(); - - verify(inputStream).close(); } } } + @Nested + class TestCompleteDownload { + + @Mock + private PipedInputStream inputStream; + + @BeforeEach + void mock() { + setSendingFinishedField(new AtomicBoolean(false)); + setDownloadFinishedField(new AtomicBoolean(true)); + setInputStreamField(inputStream); + } + + @Test + void shouldSetSendingFinished() { + downloader.completeDownload(); + + assertThat(getSendingFinished()).isTrue(); + } + + @SneakyThrows + @Test + void shouldCallCloseInputStream() { + downloader.completeDownload(); + + verify(downloader).closeInputStream(); + } + + @Test + void shouldCallOnCompleted() { + downloader.completeDownload(); + + verify(callObserver).onCompleted(); + } + + @SneakyThrows + private boolean getSendingFinished() { + var sendingFinishedField = downloader.getClass().getDeclaredField("sendingFinished"); + sendingFinishedField.setAccessible(true); + return ((AtomicBoolean) sendingFinishedField.get(downloader)).get(); + } + } + @DisplayName("Handle safety") @Nested class TestHandleSafety { @@ -399,12 +434,26 @@ class GrpcBinaryFileServerDownloaderTest { } @SneakyThrows - private void setDownloadInProgressField(AtomicBoolean downloadInProgress) { - var downloadInProgressField = downloader.getClass().getDeclaredField("downloadInProgress"); + private void setDownloadFinishedField(AtomicBoolean downloadInProgress) { + var downloadInProgressField = downloader.getClass().getDeclaredField("downloadFinished"); downloadInProgressField.setAccessible(true); downloadInProgressField.set(downloader, downloadInProgress); } + @SneakyThrows + private void setSendingFinishedField(AtomicBoolean downloadInProgress) { + var downloadInProgressField = downloader.getClass().getDeclaredField("sendingFinished"); + downloadInProgressField.setAccessible(true); + downloadInProgressField.set(downloader, downloadInProgress); + } + + @SneakyThrows + private void setBufferArray(byte[] buffer) { + var downloadInProgressField = downloader.getClass().getDeclaredField("buffer"); + downloadInProgressField.setAccessible(true); + downloadInProgressField.set(downloader, buffer); + } + static class GrpcResponseDummy { } } \ No newline at end of file -- GitLab