From 614ce3aea261faa1be7db748622e3f3063416aae Mon Sep 17 00:00:00 2001 From: OZGCloud <ozgcloud@mgm-tp.com> Date: Mon, 21 Oct 2024 16:27:03 +0200 Subject: [PATCH] OZG-6944 copy GrpcDownloader from collaboration manager --- .../common/grpc/ExceptionRunnable.java | 7 + .../common/grpc/GrpcDownloader.java | 109 ++++++ .../common/grpc/GrpcDownloaderTest.java | 335 ++++++++++++++++++ 3 files changed, 451 insertions(+) create mode 100644 nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/ExceptionRunnable.java create mode 100644 nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloader.java create mode 100644 nachrichten-manager-server/src/test/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloaderTest.java diff --git a/nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/ExceptionRunnable.java b/nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/ExceptionRunnable.java new file mode 100644 index 0000000..0c22c7f --- /dev/null +++ b/nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/ExceptionRunnable.java @@ -0,0 +1,7 @@ +package de.ozgcloud.nachrichten.common.grpc; + +import java.io.IOException; + +interface ExceptionRunnable { + void run() throws IOException; // NOSONAR +} \ No newline at end of file diff --git a/nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloader.java b/nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloader.java new file mode 100644 index 0000000..92d6c2b --- /dev/null +++ b/nachrichten-manager-server/src/main/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloader.java @@ -0,0 +1,109 @@ +package de.ozgcloud.nachrichten.common.grpc; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.springframework.core.task.TaskExecutor; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.apilib.file.OzgCloudFileId; +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.CallStreamObserver; +import lombok.Builder; + +public class GrpcDownloader<T> { + + private static final int CHUNK_SIZE = 255 * 1024; + + private CallStreamObserver<T> callObserver; + private Function<ByteString, T> chunkBuilder; + private Consumer<OutputStream> downloadConsumer; + private TaskExecutor taskExecutor; + + private final byte[] buffer = new byte[GrpcDownloader.CHUNK_SIZE]; + private final AtomicBoolean downloadInProgress = new AtomicBoolean(false); + + private PipedInputStream inputStream; + private PipedOutputStream outputStream; + + @Builder + public GrpcDownloader(CallStreamObserver<T> callObserver, OzgCloudFileId fileId, Function<ByteString, T> chunkBuilder, + Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) { + this.callObserver = callObserver; + this.chunkBuilder = chunkBuilder; + this.downloadConsumer = downloadConsumer; + this.taskExecutor = taskExecutor; + } + + public void start() { + handleSafety(this::doStart); + } + + void doStart() throws IOException { + setupStreams(); + taskExecutor.execute(this::startDownload); + callObserver.setOnReadyHandler(this::onReadyHandler); + } + + void setupStreams() throws IOException { + outputStream = new PipedOutputStream(); + inputStream = new PipedInputStream(GrpcDownloader.CHUNK_SIZE); + outputStream.connect(inputStream); + } + + void startDownload() { + handleSafety(this::doDownload); + } + + void doDownload() throws IOException { + downloadInProgress.set(true); + downloadConsumer.accept(outputStream); + downloadInProgress.set(false); + outputStream.close(); + } + + synchronized void onReadyHandler() { + if (callObserver.isReady()) { + sendChunks(); + } + } + + void sendChunks() { + handleSafety(this::doSendChunks); + } + + void doSendChunks() throws IOException { + int bytesRead; + while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { + callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); + } + if (!downloadInProgress.get()) { + inputStream.close(); + callObserver.onCompleted(); + } + } + + void handleSafety(ExceptionRunnable runnable) { + try { + runnable.run(); + } catch (IOException e) { + try { + inputStream.close(); + outputStream.close(); + } catch (IOException e1) { + throwException(e1); + } + throwException(e); + } + } + + private void throwException(IOException e) { + throw new TechnicalException("Error occurred during downloading file content download.", e); + } +} \ No newline at end of file diff --git a/nachrichten-manager-server/src/test/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloaderTest.java b/nachrichten-manager-server/src/test/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloaderTest.java new file mode 100644 index 0000000..28880fd --- /dev/null +++ b/nachrichten-manager-server/src/test/java/de/ozgcloud/nachrichten/common/grpc/GrpcDownloaderTest.java @@ -0,0 +1,335 @@ +package de.ozgcloud.nachrichten.common.grpc; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.springframework.core.task.TaskExecutor; +import org.springframework.test.util.ReflectionTestUtils; + +import com.google.protobuf.ByteString; + +import de.ozgcloud.apilib.file.OzgCloudFileId; +import de.ozgcloud.apilib.file.OzgCloudFileTestFactory; +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.stub.ClientCallStreamObserver; +import lombok.SneakyThrows; + +class GrpcDownloaderTest { + + @SuppressWarnings("unchecked") + private ClientCallStreamObserver<GrpcResponseDummy> callObserver = Mockito.mock(ClientCallStreamObserver.class); + @SuppressWarnings("unchecked") + private Function<ByteString, GrpcResponseDummy> chunkBuilder = Mockito.mock(Function.class); + @SuppressWarnings("unchecked") + private Consumer<OutputStream> downloadConsumer = Mockito.mock(Consumer.class); + private TaskExecutor taskExecutor = mock(TaskExecutor.class); + + private OzgCloudFileId fileId = OzgCloudFileTestFactory.ID; + + @Spy + private GrpcDownloader<GrpcResponseDummy> downloader = GrpcDownloader.<GrpcResponseDummy>builder() + .callObserver(callObserver) + .fileId(fileId) + .downloadConsumer(downloadConsumer) + .chunkBuilder(chunkBuilder) + .taskExecutor(taskExecutor) + .build(); + + @DisplayName("Start") + @Nested + class TestStart { + + @SneakyThrows + @Test + void shouldCallDoStart() { + doNothing().when(downloader).doStart(); + + downloader.start(); + + verify(downloader).doStart(); + } + + @SneakyThrows + @Test + void shouldCallHandleSafety() { + doNothing().when(downloader).doStart(); + + downloader.start(); + + verify(downloader).handleSafety(any(ExceptionRunnable.class)); + } + + @DisplayName("do") + @Nested + class TestDoStart { + + @Captor + private ArgumentCaptor<Runnable> runnableCaptor; + + @SneakyThrows + @Test + void shouldSetupStreams() { + downloader.doStart(); + + verify(downloader).setupStreams(); + } + + @SneakyThrows + @Test + void shouldCallTaskExecutor() { + downloader.doStart(); + + verify(taskExecutor).execute(any()); + } + + @SneakyThrows + @Test + void shouldStartDownload() { + doNothing().when(downloader).startDownload(); + + downloader.doStart(); + + verify(taskExecutor).execute(runnableCaptor.capture()); + runnableCaptor.getValue().run(); + verify(downloader).startDownload(); + } + + @SneakyThrows + @Test + void shouldSetOnReadyHandler() { + downloader.doStart(); + + verify(callObserver).setOnReadyHandler(runnableCaptor.capture()); + assertThat(runnableCaptor.getValue()).isNotNull(); + } + } + } + + @DisplayName("Start download") + @Nested + class TestStartDownload { + + @SneakyThrows + @Test + void shouldCallHandleSafety() { + doNothing().when(downloader).doDownload(); + + downloader.startDownload(); + + verify(downloader).handleSafety(any(ExceptionRunnable.class)); + } + + @SneakyThrows + @Test + void shouldCallDoDownoad() { + doNothing().when(downloader).doDownload(); + + downloader.startDownload(); + + verify(downloader).doDownload(); + } + + @DisplayName("do") + @Nested + class TestDoDownload { + + @Mock + private PipedOutputStream outputStream; + + @BeforeEach + void mock() { + ReflectionTestUtils.setField(downloader, "outputStream", outputStream); + } + + @SneakyThrows + @Test + void shouldCallDownloadConsumer() { + downloader.doDownload(); + + verify(downloadConsumer).accept(outputStream); + } + + @SneakyThrows + @Test + void shouldCloseOutputstream() { + downloader.doDownload(); + + verify(outputStream).close(); + } + } + } + + @DisplayName("On ready handler") + @Nested + class TestOnReadyHandler { + + @Test + void shouldSendChunksIfCallObserverIsReady() { + when(callObserver.isReady()).thenReturn(true); + doNothing().when(downloader).sendChunks(); + + downloader.onReadyHandler(); + + verify(downloader).sendChunks(); + } + } + + @DisplayName("Send chunks") + @Nested + class TestSendChunks { + + @SneakyThrows + @Test + void shouldCallHandleSafety() { + doNothing().when(downloader).doSendChunks(); + + downloader.sendChunks(); + + verify(downloader).handleSafety(any(ExceptionRunnable.class)); + } + + @SneakyThrows + @Test + void shouldCallDoDownoad() { + doNothing().when(downloader).doSendChunks(); + + downloader.sendChunks(); + + verify(downloader).doSendChunks(); + } + + @DisplayName("do") + @Nested + class TestDoSendChunks { + + @Mock + private PipedInputStream inputStream; + + private final int data = 20; + + @BeforeEach + void mock() { + ReflectionTestUtils.setField(downloader, "inputStream", inputStream); + } + + @SneakyThrows + @DisplayName("should send next chunk if callObserver is ready and stream already received data") + @Test + void shouldCallOnNext() { + when(callObserver.isReady()).thenReturn(true); + when(inputStream.read(any())).thenReturn(data, -1); + + downloader.doSendChunks(); + + verify(callObserver).onNext(any()); + } + + @SneakyThrows + @DisplayName("should call complete grpc stream if download has finished and stream has no data left") + @Test + void shouldCallOnCompleted() { + ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false)); + + downloader.doSendChunks(); + + verify(callObserver).onCompleted(); + } + + @SneakyThrows + @Test + void shouldCloseInputStream() { + ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false)); + + downloader.doSendChunks(); + + verify(inputStream).close(); + } + } + } + + @DisplayName("Handle safety") + @Nested + class TestHandleSafety { + + @DisplayName("on exception") + @Nested + class TestOnException { + + @Mock + private PipedOutputStream outputStream; + @Mock + private PipedInputStream inputStream; + + private final IOException exception = new IOException(); + + @SneakyThrows + @BeforeEach + void mock() { + ReflectionTestUtils.setField(downloader, "inputStream", inputStream); + ReflectionTestUtils.setField(downloader, "outputStream", outputStream); + } + + @SneakyThrows + @Test + void shouldThrowTechnicalException() { + assertThatThrownBy(this::handleSafety) + .isInstanceOf(TechnicalException.class) + .extracting(exception -> exception.getCause()).isEqualTo(exception); + } + + @SneakyThrows + @Test + void shouldCloseOutputStream() { + try { + handleSafety(); + } catch (Exception e) { + // do nothing + } + + verify(outputStream).close(); + } + + @SneakyThrows + @Test + void shouldCloseInputStream() { + try { + handleSafety(); + } catch (Exception e) { + // do nothing + } + + verify(inputStream).close(); + } + + private void handleSafety() { + downloader.handleSafety(this::dummyMethodThrowingException); + } + + private void dummyMethodThrowingException() throws IOException { + throw exception; + } + } + + } + + class GrpcResponseDummy { + } +} \ No newline at end of file -- GitLab