Skip to content
Snippets Groups Projects
Commit 66e15cd9 authored by OZGCloud's avatar OZGCloud
Browse files

Merge pull request 'GrpcDownloader in common-lib verschieben' (#16) from...

Merge pull request 'GrpcDownloader in common-lib verschieben' (#16) from replace-grpc-downloader into master

Reviewed-on: https://git.ozg-sh.de/ozgcloud-app/collaboration-manager/pulls/16


Reviewed-by: default avatarOZGCloud <ozgcloud@mgm-tp.com>
parents 08b7edbf 7d0151d8
No related branches found
No related tags found
No related merge requests found
......@@ -30,10 +30,10 @@ import com.google.protobuf.ByteString;
import de.ozgcloud.apilib.file.OzgCloudFileId;
import de.ozgcloud.collaboration.common.callcontext.CollaborationManagerCallContextGrpcServerInterceptor;
import de.ozgcloud.collaboration.common.file.CollaborationFileService;
import de.ozgcloud.collaboration.common.grpc.GrpcDownloader;
import de.ozgcloud.collaboration.vorgang.CollaborationVorgangMapper;
import de.ozgcloud.collaboration.vorgang.Vorgang;
import de.ozgcloud.collaboration.vorgang.VorgangService;
import de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
......@@ -43,8 +43,6 @@ import net.devh.boot.grpc.server.service.GrpcService;
@RequiredArgsConstructor
public class CollaborationGrpcService extends CollaborationServiceGrpc.CollaborationServiceImplBase {
public static final int CHUNK_SIZE = 255 * 1024;
private final CollaborationVorgangMapper vorgangMapper;
private final CollaborationFileService fileService;
......@@ -71,9 +69,9 @@ public class CollaborationGrpcService extends CollaborationServiceGrpc.Collabora
buildDownloader(request.getId(), responseObserver).start();
}
GrpcDownloader<GrpcGetFileContentResponse> buildDownloader(String fileId,
GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> buildDownloader(String fileId,
StreamObserver<GrpcGetFileContentResponse> responseObserver) {
return GrpcDownloader.<GrpcGetFileContentResponse>builder()
return GrpcBinaryFileServerDownloader.<GrpcGetFileContentResponse>builder()
.callObserver((CallStreamObserver<GrpcGetFileContentResponse>) responseObserver)
.downloadConsumer(outputStream -> fileService.download(OzgCloudFileId.from(fileId), outputStream))
.chunkBuilder(this::buildChunkResponse)
......
package de.ozgcloud.collaboration.common.grpc;
import java.io.IOException;
interface ExceptionRunnable {
void run() throws IOException; // NOSONAR
}
\ No newline at end of file
package de.ozgcloud.collaboration.common.grpc;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.core.task.TaskExecutor;
import com.google.protobuf.ByteString;
import de.ozgcloud.apilib.file.OzgCloudFileId;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
public class GrpcDownloader<T> {
private static final int CHUNK_SIZE = 255 * 1024;
private CallStreamObserver<T> callObserver;
private Function<ByteString, T> chunkBuilder;
private Consumer<OutputStream> downloadConsumer;
private TaskExecutor taskExecutor;
private final byte[] buffer = new byte[GrpcDownloader.CHUNK_SIZE];
private final AtomicBoolean downloadInProgress = new AtomicBoolean(false);
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
@Builder
public GrpcDownloader(CallStreamObserver<T> callObserver, OzgCloudFileId fileId, Function<ByteString, T> chunkBuilder,
Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) {
this.callObserver = callObserver;
this.chunkBuilder = chunkBuilder;
this.downloadConsumer = downloadConsumer;
this.taskExecutor = taskExecutor;
}
public void start() {
handleSafety(this::doStart);
}
void doStart() throws IOException {
setupStreams();
taskExecutor.execute(this::startDownload);
callObserver.setOnReadyHandler(this::onReadyHandler);
}
void setupStreams() throws IOException {
outputStream = new PipedOutputStream();
inputStream = new PipedInputStream(GrpcDownloader.CHUNK_SIZE);
outputStream.connect(inputStream);
}
void startDownload() {
handleSafety(this::doDownload);
}
void doDownload() throws IOException {
downloadInProgress.set(true);
downloadConsumer.accept(outputStream);
downloadInProgress.set(false);
outputStream.close();
}
synchronized void onReadyHandler() {
if (callObserver.isReady()) {
sendChunks();
}
}
void sendChunks() {
handleSafety(this::doSendChunks);
}
void doSendChunks() throws IOException {
int bytesRead;
while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) {
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
}
if (!downloadInProgress.get()) {
inputStream.close();
callObserver.onCompleted();
}
}
void handleSafety(ExceptionRunnable runnable) {
try {
runnable.run();
} catch (IOException e) {
try {
inputStream.close();
outputStream.close();
} catch (IOException e1) {
throwException(e1);
}
throwException(e);
}
}
private void throwException(IOException e) {
throw new TechnicalException("Error occurred during downloading file content download.", e);
}
}
\ No newline at end of file
......@@ -44,13 +44,13 @@ import org.springframework.test.util.ReflectionTestUtils;
import com.google.protobuf.ByteString;
import de.ozgcloud.collaboration.common.grpc.GrpcDownloader;
import de.ozgcloud.collaboration.vorgang.CollaborationVorgangMapper;
import de.ozgcloud.collaboration.vorgang.GrpcVorgangTestFactory;
import de.ozgcloud.collaboration.vorgang.Vorgang;
import de.ozgcloud.collaboration.vorgang.VorgangService;
import de.ozgcloud.collaboration.vorgang.VorgangTestFactory;
import de.ozgcloud.command.CommandTestFactory;
import de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
......@@ -156,7 +156,7 @@ class CollaborationGrpcServiceTest {
@Mock
private StreamObserver<GrpcGetFileContentResponse> responseObserver;
@Mock
private GrpcDownloader<GrpcGetFileContentResponse> grpcDownloader;
private GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> grpcDownloader;
private final String fileId = UUID.randomUUID().toString();
private final GrpcGetFileContentRequest request = GrpcGetFileContentRequest.newBuilder().setId(fileId).build();
......@@ -219,21 +219,23 @@ class CollaborationGrpcServiceTest {
}
@SuppressWarnings("unchecked")
private CallStreamObserver<GrpcGetFileContentResponse> getCallObserver(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
private CallStreamObserver<GrpcGetFileContentResponse> getCallObserver(
GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) {
return (CallStreamObserver<GrpcGetFileContentResponse>) ReflectionTestUtils.getField(downloader, "callObserver");
}
@SuppressWarnings("unchecked")
private Consumer<OutputStream> getDownloadConsumer(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
private Consumer<OutputStream> getDownloadConsumer(GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) {
return (Consumer<OutputStream>) ReflectionTestUtils.getField(downloader, "downloadConsumer");
}
@SuppressWarnings("unchecked")
private Function<ByteString, GrpcGetFileContentResponse> getChunkBuilder(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
private Function<ByteString, GrpcGetFileContentResponse> getChunkBuilder(
GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) {
return (Function<ByteString, GrpcGetFileContentResponse>) ReflectionTestUtils.getField(downloader, "chunkBuilder");
}
private TaskExecutor getTaskExecuter(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
private TaskExecutor getTaskExecuter(GrpcBinaryFileServerDownloader<GrpcGetFileContentResponse> downloader) {
return (TaskExecutor) ReflectionTestUtils.getField(downloader, "taskExecutor");
}
}
......
package de.ozgcloud.collaboration.common.grpc;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.springframework.core.task.TaskExecutor;
import org.springframework.test.util.ReflectionTestUtils;
import com.google.protobuf.ByteString;
import de.ozgcloud.apilib.file.OzgCloudFileId;
import de.ozgcloud.collaboration.common.file.FileTestFactory;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.ClientCallStreamObserver;
import lombok.SneakyThrows;
class GrpcDownloaderTest {
@SuppressWarnings("unchecked")
private ClientCallStreamObserver<GrpcResponseDummy> callObserver = Mockito.mock(ClientCallStreamObserver.class);
@SuppressWarnings("unchecked")
private Function<ByteString, GrpcResponseDummy> chunkBuilder = Mockito.mock(Function.class);
@SuppressWarnings("unchecked")
private Consumer<OutputStream> downloadConsumer = Mockito.mock(Consumer.class);
private TaskExecutor taskExecutor = mock(TaskExecutor.class);
private OzgCloudFileId fileId = FileTestFactory.ID;
@Spy
private GrpcDownloader<GrpcResponseDummy> downloader = GrpcDownloader.<GrpcResponseDummy>builder()
.callObserver(callObserver)
.fileId(fileId)
.downloadConsumer(downloadConsumer)
.chunkBuilder(chunkBuilder)
.taskExecutor(taskExecutor)
.build();
@DisplayName("Start")
@Nested
class TestStart {
@SneakyThrows
@Test
void shouldCallDoStart() {
doNothing().when(downloader).doStart();
downloader.start();
verify(downloader).doStart();
}
@SneakyThrows
@Test
void shouldCallHandleSafety() {
doNothing().when(downloader).doStart();
downloader.start();
verify(downloader).handleSafety(any(ExceptionRunnable.class));
}
@DisplayName("do")
@Nested
class TestDoStart {
@Captor
private ArgumentCaptor<Runnable> runnableCaptor;
@SneakyThrows
@Test
void shouldSetupStreams() {
downloader.doStart();
verify(downloader).setupStreams();
}
@SneakyThrows
@Test
void shouldCallTaskExecutor() {
downloader.doStart();
verify(taskExecutor).execute(any());
}
@SneakyThrows
@Test
void shouldStartDownload() {
doNothing().when(downloader).startDownload();
downloader.doStart();
verify(taskExecutor).execute(runnableCaptor.capture());
runnableCaptor.getValue().run();
verify(downloader).startDownload();
}
@SneakyThrows
@Test
void shouldSetOnReadyHandler() {
downloader.doStart();
verify(callObserver).setOnReadyHandler(runnableCaptor.capture());
assertThat(runnableCaptor.getValue()).isNotNull();
}
}
}
@DisplayName("Start download")
@Nested
class TestStartDownload {
@SneakyThrows
@Test
void shouldCallHandleSafety() {
doNothing().when(downloader).doDownload();
downloader.startDownload();
verify(downloader).handleSafety(any(ExceptionRunnable.class));
}
@SneakyThrows
@Test
void shouldCallDoDownoad() {
doNothing().when(downloader).doDownload();
downloader.startDownload();
verify(downloader).doDownload();
}
@DisplayName("do")
@Nested
class TestDoDownload {
@Mock
private PipedOutputStream outputStream;
@BeforeEach
void mock() {
ReflectionTestUtils.setField(downloader, "outputStream", outputStream);
}
@SneakyThrows
@Test
void shouldCallDownloadConsumer() {
downloader.doDownload();
verify(downloadConsumer).accept(outputStream);
}
@SneakyThrows
@Test
void shouldCloseOutputstream() {
downloader.doDownload();
verify(outputStream).close();
}
}
}
@DisplayName("On ready handler")
@Nested
class TestOnReadyHandler {
@Test
void shouldSendChunksIfCallObserverIsReady() {
when(callObserver.isReady()).thenReturn(true);
doNothing().when(downloader).sendChunks();
downloader.onReadyHandler();
verify(downloader).sendChunks();
}
}
@DisplayName("Send chunks")
@Nested
class TestSendChunks {
@SneakyThrows
@Test
void shouldCallHandleSafety() {
doNothing().when(downloader).doSendChunks();
downloader.sendChunks();
verify(downloader).handleSafety(any(ExceptionRunnable.class));
}
@SneakyThrows
@Test
void shouldCallDoDownoad() {
doNothing().when(downloader).doSendChunks();
downloader.sendChunks();
verify(downloader).doSendChunks();
}
@DisplayName("do")
@Nested
class TestDoSendChunks {
@Mock
private PipedInputStream inputStream;
private final int data = 20;
@BeforeEach
void mock() {
ReflectionTestUtils.setField(downloader, "inputStream", inputStream);
}
@SneakyThrows
@DisplayName("should send next chunk if callObserver is ready and stream already received data")
@Test
void shouldCallOnNext() {
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(data, -1);
downloader.doSendChunks();
verify(callObserver).onNext(any());
}
@SneakyThrows
@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@Test
void shouldCallOnCompleted() {
ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false));
downloader.doSendChunks();
verify(callObserver).onCompleted();
}
@SneakyThrows
@Test
void shouldCloseInputStream() {
ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false));
downloader.doSendChunks();
verify(inputStream).close();
}
}
}
@DisplayName("Handle safety")
@Nested
class TestHandleSafety {
@DisplayName("on exception")
@Nested
class TestOnException {
@Mock
private PipedOutputStream outputStream;
@Mock
private PipedInputStream inputStream;
private final IOException exception = new IOException();
@SneakyThrows
@BeforeEach
void mock() {
ReflectionTestUtils.setField(downloader, "inputStream", inputStream);
ReflectionTestUtils.setField(downloader, "outputStream", outputStream);
}
@SneakyThrows
@Test
void shouldThrowTechnicalException() {
assertThatThrownBy(this::handleSafety)
.isInstanceOf(TechnicalException.class)
.extracting(exception -> exception.getCause()).isEqualTo(exception);
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
try {
handleSafety();
} catch (Exception e) {
// do nothing
}
verify(outputStream).close();
}
@SneakyThrows
@Test
void shouldCloseInputStream() {
try {
handleSafety();
} catch (Exception e) {
// do nothing
}
verify(inputStream).close();
}
private void handleSafety() {
downloader.handleSafety(this::dummyMethodThrowingException);
}
private void dummyMethodThrowingException() throws IOException {
throw exception;
}
}
}
class GrpcResponseDummy {
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment