diff --git a/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/CollaborationGrpcService.java b/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/CollaborationGrpcService.java index 0bd2b27c88fb91ba2b776fcd10e7c4e73432764f..7f0c1814bb124acd3bf467354c0c6bb95868d48d 100644 --- a/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/CollaborationGrpcService.java +++ b/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/CollaborationGrpcService.java @@ -30,10 +30,10 @@ import com.google.protobuf.ByteString; import de.ozgcloud.apilib.file.OzgCloudFileId; import de.ozgcloud.collaboration.common.callcontext.CollaborationManagerCallContextGrpcServerInterceptor; import de.ozgcloud.collaboration.common.file.CollaborationFileService; -import de.ozgcloud.collaboration.common.grpc.GrpcDownloader; import de.ozgcloud.collaboration.vorgang.CollaborationVorgangMapper; import de.ozgcloud.collaboration.vorgang.Vorgang; import de.ozgcloud.collaboration.vorgang.VorgangService; +import de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.RequiredArgsConstructor; @@ -43,8 +43,6 @@ import net.devh.boot.grpc.server.service.GrpcService; @RequiredArgsConstructor public class CollaborationGrpcService extends CollaborationServiceGrpc.CollaborationServiceImplBase { - public static final int CHUNK_SIZE = 255 * 1024; - private final CollaborationVorgangMapper vorgangMapper; private final CollaborationFileService fileService; @@ -71,9 +69,9 @@ public class CollaborationGrpcService extends CollaborationServiceGrpc.Collabora buildDownloader(request.getId(), responseObserver).start(); } - GrpcDownloader<GrpcGetFileContentResponse> buildDownloader(String fileId, + GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> buildDownloader(String fileId, StreamObserver<GrpcGetFileContentResponse> responseObserver) { - return GrpcDownloader.<GrpcGetFileContentResponse>builder() + return GrpcBinaryFileServerDownloader.<GrpcGetFileContentResponse>builder() .callObserver((CallStreamObserver<GrpcGetFileContentResponse>) responseObserver) .downloadConsumer(outputStream -> fileService.download(OzgCloudFileId.from(fileId), outputStream)) .chunkBuilder(this::buildChunkResponse) diff --git a/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/common/grpc/ExceptionRunnable.java b/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/common/grpc/ExceptionRunnable.java deleted file mode 100644 index 0fc69b2046870612318ed74500e3eb83c6884f89..0000000000000000000000000000000000000000 --- a/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/common/grpc/ExceptionRunnable.java +++ /dev/null @@ -1,7 +0,0 @@ -package de.ozgcloud.collaboration.common.grpc; - -import java.io.IOException; - -interface ExceptionRunnable { - void run() throws IOException; // NOSONAR -} \ No newline at end of file diff --git a/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/common/grpc/GrpcDownloader.java b/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/common/grpc/GrpcDownloader.java deleted file mode 100644 index 6d777066a4cea56cb6df3276760a5f44df47b567..0000000000000000000000000000000000000000 --- a/collaboration-manager-server/src/main/java/de/ozgcloud/collaboration/common/grpc/GrpcDownloader.java +++ /dev/null @@ -1,109 +0,0 @@ -package de.ozgcloud.collaboration.common.grpc; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.springframework.core.task.TaskExecutor; - -import com.google.protobuf.ByteString; - -import de.ozgcloud.apilib.file.OzgCloudFileId; -import de.ozgcloud.common.errorhandling.TechnicalException; -import io.grpc.stub.CallStreamObserver; -import lombok.Builder; - -public class GrpcDownloader<T> { - - private static final int CHUNK_SIZE = 255 * 1024; - - private CallStreamObserver<T> callObserver; - private Function<ByteString, T> chunkBuilder; - private Consumer<OutputStream> downloadConsumer; - private TaskExecutor taskExecutor; - - private final byte[] buffer = new byte[GrpcDownloader.CHUNK_SIZE]; - private final AtomicBoolean downloadInProgress = new AtomicBoolean(false); - - private PipedInputStream inputStream; - private PipedOutputStream outputStream; - - @Builder - public GrpcDownloader(CallStreamObserver<T> callObserver, OzgCloudFileId fileId, Function<ByteString, T> chunkBuilder, - Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) { - this.callObserver = callObserver; - this.chunkBuilder = chunkBuilder; - this.downloadConsumer = downloadConsumer; - this.taskExecutor = taskExecutor; - } - - public void start() { - handleSafety(this::doStart); - } - - void doStart() throws IOException { - setupStreams(); - taskExecutor.execute(this::startDownload); - callObserver.setOnReadyHandler(this::onReadyHandler); - } - - void setupStreams() throws IOException { - outputStream = new PipedOutputStream(); - inputStream = new PipedInputStream(GrpcDownloader.CHUNK_SIZE); - outputStream.connect(inputStream); - } - - void startDownload() { - handleSafety(this::doDownload); - } - - void doDownload() throws IOException { - downloadInProgress.set(true); - downloadConsumer.accept(outputStream); - downloadInProgress.set(false); - outputStream.close(); - } - - synchronized void onReadyHandler() { - if (callObserver.isReady()) { - sendChunks(); - } - } - - void sendChunks() { - handleSafety(this::doSendChunks); - } - - void doSendChunks() throws IOException { - int bytesRead; - while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { - callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); - } - if (!downloadInProgress.get()) { - inputStream.close(); - callObserver.onCompleted(); - } - } - - void handleSafety(ExceptionRunnable runnable) { - try { - runnable.run(); - } catch (IOException e) { - try { - inputStream.close(); - outputStream.close(); - } catch (IOException e1) { - throwException(e1); - } - throwException(e); - } - } - - private void throwException(IOException e) { - throw new TechnicalException("Error occurred during downloading file content download.", e); - } -} \ No newline at end of file diff --git a/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/CollaborationGrpcServiceTest.java b/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/CollaborationGrpcServiceTest.java index 7614a782ae107dcd1fdd6d3e5c521d6777b98e36..21547a4885edfc8a8129129438870d11217ed6e6 100644 --- a/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/CollaborationGrpcServiceTest.java +++ b/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/CollaborationGrpcServiceTest.java @@ -44,13 +44,13 @@ import org.springframework.test.util.ReflectionTestUtils; import com.google.protobuf.ByteString; -import de.ozgcloud.collaboration.common.grpc.GrpcDownloader; import de.ozgcloud.collaboration.vorgang.CollaborationVorgangMapper; import de.ozgcloud.collaboration.vorgang.GrpcVorgangTestFactory; import de.ozgcloud.collaboration.vorgang.Vorgang; import de.ozgcloud.collaboration.vorgang.VorgangService; import de.ozgcloud.collaboration.vorgang.VorgangTestFactory; import de.ozgcloud.command.CommandTestFactory; +import de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; @@ -156,7 +156,7 @@ class CollaborationGrpcServiceTest { @Mock private StreamObserver<GrpcGetFileContentResponse> responseObserver; @Mock - private GrpcDownloader<GrpcGetFileContentResponse> grpcDownloader; + private GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> grpcDownloader; private final String fileId = UUID.randomUUID().toString(); private final GrpcGetFileContentRequest request = GrpcGetFileContentRequest.newBuilder().setId(fileId).build(); @@ -219,21 +219,23 @@ class CollaborationGrpcServiceTest { } @SuppressWarnings("unchecked") - private CallStreamObserver<GrpcGetFileContentResponse> getCallObserver(GrpcDownloader<GrpcGetFileContentResponse> downloader) { + private CallStreamObserver<GrpcGetFileContentResponse> getCallObserver( + GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) { return (CallStreamObserver<GrpcGetFileContentResponse>) ReflectionTestUtils.getField(downloader, "callObserver"); } @SuppressWarnings("unchecked") - private Consumer<OutputStream> getDownloadConsumer(GrpcDownloader<GrpcGetFileContentResponse> downloader) { + private Consumer<OutputStream> getDownloadConsumer(GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) { return (Consumer<OutputStream>) ReflectionTestUtils.getField(downloader, "downloadConsumer"); } @SuppressWarnings("unchecked") - private Function<ByteString, GrpcGetFileContentResponse> getChunkBuilder(GrpcDownloader<GrpcGetFileContentResponse> downloader) { + private Function<ByteString, GrpcGetFileContentResponse> getChunkBuilder( + GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) { return (Function<ByteString, GrpcGetFileContentResponse>) ReflectionTestUtils.getField(downloader, "chunkBuilder"); } - private TaskExecutor getTaskExecuter(GrpcDownloader<GrpcGetFileContentResponse> downloader) { + private TaskExecutor getTaskExecuter(GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) { return (TaskExecutor) ReflectionTestUtils.getField(downloader, "taskExecutor"); } } diff --git a/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/common/grpc/GrpcDownloaderTest.java b/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/common/grpc/GrpcDownloaderTest.java deleted file mode 100644 index 8f088c2287165b77e639ef0dc52db88c5ca69acd..0000000000000000000000000000000000000000 --- a/collaboration-manager-server/src/test/java/de/ozgcloud/collaboration/common/grpc/GrpcDownloaderTest.java +++ /dev/null @@ -1,335 +0,0 @@ -package de.ozgcloud.collaboration.common.grpc; - -import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.Spy; -import org.springframework.core.task.TaskExecutor; -import org.springframework.test.util.ReflectionTestUtils; - -import com.google.protobuf.ByteString; - -import de.ozgcloud.apilib.file.OzgCloudFileId; -import de.ozgcloud.collaboration.common.file.FileTestFactory; -import de.ozgcloud.common.errorhandling.TechnicalException; -import io.grpc.stub.ClientCallStreamObserver; -import lombok.SneakyThrows; - -class GrpcDownloaderTest { - - @SuppressWarnings("unchecked") - private ClientCallStreamObserver<GrpcResponseDummy> callObserver = Mockito.mock(ClientCallStreamObserver.class); - @SuppressWarnings("unchecked") - private Function<ByteString, GrpcResponseDummy> chunkBuilder = Mockito.mock(Function.class); - @SuppressWarnings("unchecked") - private Consumer<OutputStream> downloadConsumer = Mockito.mock(Consumer.class); - private TaskExecutor taskExecutor = mock(TaskExecutor.class); - - private OzgCloudFileId fileId = FileTestFactory.ID; - - @Spy - private GrpcDownloader<GrpcResponseDummy> downloader = GrpcDownloader.<GrpcResponseDummy>builder() - .callObserver(callObserver) - .fileId(fileId) - .downloadConsumer(downloadConsumer) - .chunkBuilder(chunkBuilder) - .taskExecutor(taskExecutor) - .build(); - - @DisplayName("Start") - @Nested - class TestStart { - - @SneakyThrows - @Test - void shouldCallDoStart() { - doNothing().when(downloader).doStart(); - - downloader.start(); - - verify(downloader).doStart(); - } - - @SneakyThrows - @Test - void shouldCallHandleSafety() { - doNothing().when(downloader).doStart(); - - downloader.start(); - - verify(downloader).handleSafety(any(ExceptionRunnable.class)); - } - - @DisplayName("do") - @Nested - class TestDoStart { - - @Captor - private ArgumentCaptor<Runnable> runnableCaptor; - - @SneakyThrows - @Test - void shouldSetupStreams() { - downloader.doStart(); - - verify(downloader).setupStreams(); - } - - @SneakyThrows - @Test - void shouldCallTaskExecutor() { - downloader.doStart(); - - verify(taskExecutor).execute(any()); - } - - @SneakyThrows - @Test - void shouldStartDownload() { - doNothing().when(downloader).startDownload(); - - downloader.doStart(); - - verify(taskExecutor).execute(runnableCaptor.capture()); - runnableCaptor.getValue().run(); - verify(downloader).startDownload(); - } - - @SneakyThrows - @Test - void shouldSetOnReadyHandler() { - downloader.doStart(); - - verify(callObserver).setOnReadyHandler(runnableCaptor.capture()); - assertThat(runnableCaptor.getValue()).isNotNull(); - } - } - } - - @DisplayName("Start download") - @Nested - class TestStartDownload { - - @SneakyThrows - @Test - void shouldCallHandleSafety() { - doNothing().when(downloader).doDownload(); - - downloader.startDownload(); - - verify(downloader).handleSafety(any(ExceptionRunnable.class)); - } - - @SneakyThrows - @Test - void shouldCallDoDownoad() { - doNothing().when(downloader).doDownload(); - - downloader.startDownload(); - - verify(downloader).doDownload(); - } - - @DisplayName("do") - @Nested - class TestDoDownload { - - @Mock - private PipedOutputStream outputStream; - - @BeforeEach - void mock() { - ReflectionTestUtils.setField(downloader, "outputStream", outputStream); - } - - @SneakyThrows - @Test - void shouldCallDownloadConsumer() { - downloader.doDownload(); - - verify(downloadConsumer).accept(outputStream); - } - - @SneakyThrows - @Test - void shouldCloseOutputstream() { - downloader.doDownload(); - - verify(outputStream).close(); - } - } - } - - @DisplayName("On ready handler") - @Nested - class TestOnReadyHandler { - - @Test - void shouldSendChunksIfCallObserverIsReady() { - when(callObserver.isReady()).thenReturn(true); - doNothing().when(downloader).sendChunks(); - - downloader.onReadyHandler(); - - verify(downloader).sendChunks(); - } - } - - @DisplayName("Send chunks") - @Nested - class TestSendChunks { - - @SneakyThrows - @Test - void shouldCallHandleSafety() { - doNothing().when(downloader).doSendChunks(); - - downloader.sendChunks(); - - verify(downloader).handleSafety(any(ExceptionRunnable.class)); - } - - @SneakyThrows - @Test - void shouldCallDoDownoad() { - doNothing().when(downloader).doSendChunks(); - - downloader.sendChunks(); - - verify(downloader).doSendChunks(); - } - - @DisplayName("do") - @Nested - class TestDoSendChunks { - - @Mock - private PipedInputStream inputStream; - - private final int data = 20; - - @BeforeEach - void mock() { - ReflectionTestUtils.setField(downloader, "inputStream", inputStream); - } - - @SneakyThrows - @DisplayName("should send next chunk if callObserver is ready and stream already received data") - @Test - void shouldCallOnNext() { - when(callObserver.isReady()).thenReturn(true); - when(inputStream.read(any())).thenReturn(data, -1); - - downloader.doSendChunks(); - - verify(callObserver).onNext(any()); - } - - @SneakyThrows - @DisplayName("should call complete grpc stream if download has finished and stream has no data left") - @Test - void shouldCallOnCompleted() { - ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false)); - - downloader.doSendChunks(); - - verify(callObserver).onCompleted(); - } - - @SneakyThrows - @Test - void shouldCloseInputStream() { - ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false)); - - downloader.doSendChunks(); - - verify(inputStream).close(); - } - } - } - - @DisplayName("Handle safety") - @Nested - class TestHandleSafety { - - @DisplayName("on exception") - @Nested - class TestOnException { - - @Mock - private PipedOutputStream outputStream; - @Mock - private PipedInputStream inputStream; - - private final IOException exception = new IOException(); - - @SneakyThrows - @BeforeEach - void mock() { - ReflectionTestUtils.setField(downloader, "inputStream", inputStream); - ReflectionTestUtils.setField(downloader, "outputStream", outputStream); - } - - @SneakyThrows - @Test - void shouldThrowTechnicalException() { - assertThatThrownBy(this::handleSafety) - .isInstanceOf(TechnicalException.class) - .extracting(exception -> exception.getCause()).isEqualTo(exception); - } - - @SneakyThrows - @Test - void shouldCloseOutputStream() { - try { - handleSafety(); - } catch (Exception e) { - // do nothing - } - - verify(outputStream).close(); - } - - @SneakyThrows - @Test - void shouldCloseInputStream() { - try { - handleSafety(); - } catch (Exception e) { - // do nothing - } - - verify(inputStream).close(); - } - - private void handleSafety() { - downloader.handleSafety(this::dummyMethodThrowingException); - } - - private void dummyMethodThrowingException() throws IOException { - throw exception; - } - } - - } - - class GrpcResponseDummy { - } -} \ No newline at end of file