diff --git a/ozgcloud-common-lib/pom.xml b/ozgcloud-common-lib/pom.xml index 20b9fc442de15624c26014c245295b139404a225..16adc23bdedb150d56722e8935381c67729ee865 100644 --- a/ozgcloud-common-lib/pom.xml +++ b/ozgcloud-common-lib/pom.xml @@ -83,6 +83,11 @@ <artifactId>grpc-stub</artifactId> </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <!-- tools --> <dependency> <groupId>org.projectlombok</groupId> diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/ExceptionalRunnable.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/ExceptionalRunnable.java new file mode 100644 index 0000000000000000000000000000000000000000..252dae28efbcdf9b01c79379d96056b94d646a20 --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/ExceptionalRunnable.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import java.io.IOException; + +interface ExceptionalRunnable { + + void run() throws IOException; // NOSONAR +} \ No newline at end of file diff --git a/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java new file mode 100644 index 0000000000000000000000000000000000000000..c85c178eb6ccd550dce8eac3b290e67fd815621d --- /dev/null +++ b/ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloader.java @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import com.google.protobuf.ByteString; +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.Context; +import io.grpc.stub.CallStreamObserver; +import lombok.Builder; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.io.IOUtils; +import org.springframework.core.task.TaskExecutor; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; + +@Log4j2 +public class GrpcBinaryFileServerDownloader<T> { + + private static final int CHUNK_SIZE = 255 * 1024; + + private final CallStreamObserver<T> callObserver; + private final Function<ByteString, T> chunkBuilder; + private final Consumer<OutputStream> downloadConsumer; + private final TaskExecutor taskExecutor; + + private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE]; + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean downloadInProgress = new AtomicBoolean(false); + + private PipedInputStream inputStream; + private PipedOutputStream outputStream; + + @Builder + public GrpcBinaryFileServerDownloader(CallStreamObserver<T> callObserver, Function<ByteString, T> chunkBuilder, + Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) { + this.callObserver = callObserver; + this.chunkBuilder = chunkBuilder; + this.downloadConsumer = downloadConsumer; + this.taskExecutor = taskExecutor; + } + + public void start() { + if (isNotStarted()) { + doStart(); + } + } + + boolean isNotStarted() { + return started.compareAndSet(false, true); + } + + void doStart() { + handleSafety(this::setupStreams); + taskExecutor.execute(Context.current().wrap(this::startDownload)); + callObserver.setOnReadyHandler(this::onReadyHandler); + } + + void setupStreams() throws IOException { + outputStream = new PipedOutputStream(); + inputStream = new PipedInputStream(GrpcBinaryFileServerDownloader.CHUNK_SIZE); + outputStream.connect(inputStream); + } + + void startDownload() { + handleSafety(this::doDownload); + } + + void doDownload() throws IOException { + downloadInProgress.set(true); + downloadConsumer.accept(outputStream); + downloadInProgress.set(false); + outputStream.close(); + } + + synchronized void onReadyHandler() { + if (callObserver.isReady()) { + sendChunks(); + } + } + + void sendChunks() { + handleSafety(this::doSendChunks); + } + + void doSendChunks() throws IOException { + int bytesRead; + while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { + callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); + } + if (!downloadInProgress.get()) { + inputStream.close(); + callObserver.onCompleted(); + } + } + + void handleSafety(ExceptionalRunnable runnable) { + try { + runnable.run(); + } catch (IOException e) { + IOUtils.closeQuietly(inputStream, e1 -> LOG.error("InputStream cannot be closed.", e1)); + IOUtils.closeQuietly(outputStream, e1 -> LOG.error("OutputStream cannot be closed.", e1)); + throw new TechnicalException("Error occurred during downloading file content download.", e); + } + } +} \ No newline at end of file diff --git a/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ed67be0343b394aa972998f64641b63ba320e69b --- /dev/null +++ b/ozgcloud-common-lib/src/test/java/de/ozgcloud/common/binaryfile/GrpcBinaryFileServerDownloaderTest.java @@ -0,0 +1,410 @@ +/* + * Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den + * Ministerpräsidenten des Landes Schleswig-Holstein + * Staatskanzlei + * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung + * + * Lizenziert unter der EUPL, Version 1.2 oder - sobald + * diese von der Europäischen Kommission genehmigt wurden - + * Folgeversionen der EUPL ("Lizenz"); + * Sie dürfen dieses Werk ausschließlich gemäß + * dieser Lizenz nutzen. + * Eine Kopie der Lizenz finden Sie hier: + * + * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 + * + * Sofern nicht durch anwendbare Rechtsvorschriften + * gefordert oder in schriftlicher Form vereinbart, wird + * die unter der Lizenz verbreitete Software "so wie sie + * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN - + * ausdrücklich oder stillschweigend - verbreitet. + * Die sprachspezifischen Genehmigungen und Beschränkungen + * unter der Lizenz sind dem Lizenztext zu entnehmen. + */ +package de.ozgcloud.common.binaryfile; + +import com.google.protobuf.ByteString; +import de.ozgcloud.common.errorhandling.TechnicalException; +import io.grpc.Context; +import io.grpc.stub.CallStreamObserver; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.springframework.core.task.TaskExecutor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +class GrpcBinaryFileServerDownloaderTest { + + @Mock + private CallStreamObserver<GrpcResponseDummy> callObserver; + @Mock + private Function<ByteString, GrpcResponseDummy> chunkBuilder; + @Mock + private Consumer<OutputStream> downloadConsumer; + @Mock + private TaskExecutor taskExecutor; + + private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader; + + @BeforeEach + void init() { + downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().callObserver(callObserver).downloadConsumer(downloadConsumer) + .chunkBuilder(chunkBuilder).taskExecutor(taskExecutor).build()); + } + + @DisplayName("Start") + @Nested + class TestStart { + + @BeforeEach + void init() { + doNothing().when(downloader).doStart(); + } + + @Test + void shouldCallDoStart() { + + downloader.start(); + + verify(downloader).doStart(); + } + + @Test + void shouldNotStartWhenStarted() { + downloader.start(); + downloader.start(); + + verify(downloader, times(1)).doStart(); + } + } + + @Nested + class TestIsNotStarted { + + @Test + void shouldReturnTrueIfNotStarted() { + var result = downloader.isNotStarted(); + + assertThat(result).isTrue(); + } + + @Test + void shouldReturnFalseIfStarted() { + downloader.isNotStarted(); + + var result = downloader.isNotStarted(); + + assertThat(result).isFalse(); + } + } + + @DisplayName("do start") + @Nested + class TestDoStart { + + @Mock + private Context callContext; + @Mock + private Runnable wrappedRunnable; + + @Captor + private ArgumentCaptor<Runnable> wrappedRunnableCaptor; + @Captor + private ArgumentCaptor<Runnable> runnableCaptor; + @Captor + private ArgumentCaptor<ExceptionalRunnable> setupStreamCaptor; + + @SneakyThrows + @Test + void shouldCallSetupStreams() { + doNothing().when(downloader).handleSafety(any()); + + downloader.doStart(); + + verify(downloader).handleSafety(setupStreamCaptor.capture()); + setupStreamCaptor.getValue().run(); + verify(downloader).setupStreams(); + } + + @Test + void shouldCallTaskExecutor() { + try (var contextMock = mockStatic(Context.class)) { + contextMock.when(Context::current).thenReturn(callContext); + when(callContext.wrap(any(Runnable.class))).thenReturn(wrappedRunnable); + + downloader.doStart(); + + verify(taskExecutor).execute(wrappedRunnable); + } + } + + @Test + void shouldCallStartDownload() { + try (var contextMock = mockStatic(Context.class)) { + contextMock.when(Context::current).thenReturn(callContext); + when(callContext.wrap(any(Runnable.class))).thenReturn(wrappedRunnable); + doNothing().when(downloader).startDownload(); + + downloader.doStart(); + + verify(callContext).wrap(wrappedRunnableCaptor.capture()); + wrappedRunnableCaptor.getValue().run(); + verify(downloader).startDownload(); + } + } + + @SneakyThrows + @Test + void shouldSetOnReadyHandler() { + downloader.doStart(); + + verify(callObserver).setOnReadyHandler(runnableCaptor.capture()); + assertThat(runnableCaptor.getValue()).isNotNull(); + } + } + + @DisplayName("Start download") + @Nested + class TestStartDownload { + + @Mock + private Context callContext; + + @Captor + private ArgumentCaptor<ExceptionalRunnable> runnableCaptor; + + @SneakyThrows + @Test + void shouldCallDoDownload() { + doNothing().when(downloader).handleSafety(any()); + doNothing().when(downloader).doDownload(); + + downloader.startDownload(); + + verify(downloader).handleSafety(runnableCaptor.capture()); + runnableCaptor.getValue().run(); + verify(downloader).doDownload(); + } + } + + @DisplayName("do") + @Nested + class TestDoDownload { + + @Mock + private PipedOutputStream outputStream; + + @BeforeEach + void mock() { + setOutputStreamField(outputStream); + } + + @SneakyThrows + @Test + void shouldCallDownloadConsumer() { + downloader.doDownload(); + + verify(downloadConsumer).accept(outputStream); + } + + @SneakyThrows + @Test + void shouldCloseOutputStream() { + downloader.doDownload(); + + verify(outputStream).close(); + } + } + + @DisplayName("On ready handler") + @Nested + class TestOnReadyHandler { + + @Test + void shouldSendChunksIfCallObserverIsReady() { + when(callObserver.isReady()).thenReturn(true); + doNothing().when(downloader).sendChunks(); + + downloader.onReadyHandler(); + + verify(downloader).sendChunks(); + } + } + + @DisplayName("Send chunks") + @Nested + class TestSendChunks { + + @SneakyThrows + @Test + void shouldCallHandleSafety() { + doNothing().when(downloader).doSendChunks(); + + downloader.sendChunks(); + + verify(downloader).handleSafety(any(ExceptionalRunnable.class)); + } + + @SneakyThrows + @Test + void shouldCallDoDownoad() { + doNothing().when(downloader).doSendChunks(); + + downloader.sendChunks(); + + verify(downloader).doSendChunks(); + } + + @DisplayName("do") + @Nested + class TestDoSendChunks { + + @Mock + private PipedInputStream inputStream; + + private final int data = 20; + + @BeforeEach + void mock() { + setInputStreamField(inputStream); + } + + @SneakyThrows + @DisplayName("should send next chunk if callObserver is ready and stream already received data") + @Test + void shouldCallOnNext() { + when(callObserver.isReady()).thenReturn(true); + when(inputStream.read(any())).thenReturn(data, -1); + + downloader.doSendChunks(); + + verify(callObserver).onNext(any()); + } + + @SneakyThrows + @DisplayName("should call complete grpc stream if download has finished and stream has no data left") + @Test + void shouldCallOnCompleted() { + setDownloadInProgressField(new AtomicBoolean(false)); + + downloader.doSendChunks(); + + verify(callObserver).onCompleted(); + } + + @SneakyThrows + @Test + void shouldCloseInputStream() { + setDownloadInProgressField(new AtomicBoolean(false)); + + downloader.doSendChunks(); + + verify(inputStream).close(); + } + } + } + + @DisplayName("Handle safety") + @Nested + class TestHandleSafety { + + @DisplayName("on exception") + @Nested + class TestOnException { + + @Mock + private PipedOutputStream outputStream; + @Mock + private PipedInputStream inputStream; + + private final IOException exception = new IOException(); + + @SneakyThrows + @BeforeEach + void mock() { + setInputStreamField(inputStream); + setOutputStreamField(outputStream); + } + + @SneakyThrows + @Test + void shouldThrowTechnicalException() { + assertThatThrownBy(this::handleSafety).isInstanceOf(TechnicalException.class).extracting(Throwable::getCause).isEqualTo(exception); + } + + @SneakyThrows + @Test + void shouldCloseOutputStream() { + try { + handleSafety(); + } catch (Exception e) { + // do nothing + } + + verify(outputStream).close(); + } + + @SneakyThrows + @Test + void shouldCloseInputStream() { + try { + handleSafety(); + } catch (Exception e) { + // do nothing + } + + verify(inputStream).close(); + } + + private void handleSafety() { + downloader.handleSafety(this::dummyMethodThrowingException); + } + + private void dummyMethodThrowingException() throws IOException { + throw exception; + } + } + + } + + @SneakyThrows + private void setOutputStreamField(OutputStream outputStream) { + var outputStreamField = downloader.getClass().getDeclaredField("outputStream"); + outputStreamField.setAccessible(true); + outputStreamField.set(downloader, outputStream); + } + + @SneakyThrows + private void setInputStreamField(InputStream inputStream) { + var inputStreamField = downloader.getClass().getDeclaredField("inputStream"); + inputStreamField.setAccessible(true); + inputStreamField.set(downloader, inputStream); + } + + @SneakyThrows + private void setDownloadInProgressField(AtomicBoolean downloadInProgress) { + var downloadInProgressField = downloader.getClass().getDeclaredField("downloadInProgress"); + downloadInProgressField.setAccessible(true); + downloadInProgressField.set(downloader, downloadInProgress); + } + + static class GrpcResponseDummy { + } +} \ No newline at end of file