Skip to content
Snippets Groups Projects
Commit e20ac1d5 authored by OZGCloud's avatar OZGCloud
Browse files

OZG-6528 cleanup

parent 7b8dc645
No related branches found
No related tags found
No related merge requests found
...@@ -16,26 +16,21 @@ import de.ozgcloud.apilib.file.OzgCloudFileId; ...@@ -16,26 +16,21 @@ import de.ozgcloud.apilib.file.OzgCloudFileId;
import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver; import io.grpc.stub.CallStreamObserver;
import lombok.Builder; import lombok.Builder;
import lombok.Getter;
public class GrpcDownloader<T> { public class GrpcDownloader<T> {
private static final int CHUNK_SIZE = 255 * 1024; private static final int CHUNK_SIZE = 255 * 1024;
@Getter
private CallStreamObserver<T> callObserver; private CallStreamObserver<T> callObserver;
@Getter
private Function<ByteString, T> chunkBuilder; private Function<ByteString, T> chunkBuilder;
@Getter
private Consumer<OutputStream> downloadConsumer; private Consumer<OutputStream> downloadConsumer;
@Getter
private TaskExecutor taskExecutor; private TaskExecutor taskExecutor;
private final byte[] buffer = new byte[GrpcDownloader.CHUNK_SIZE]; private final byte[] buffer = new byte[GrpcDownloader.CHUNK_SIZE];
private final AtomicBoolean downloadInProgress = new AtomicBoolean(false);
private PipedInputStream inputStream; private PipedInputStream inputStream;
private PipedOutputStream outputStream; private PipedOutputStream outputStream;
private AtomicBoolean downloadInProgress = new AtomicBoolean(false);
@Builder @Builder
public GrpcDownloader(CallStreamObserver<T> callObserver, OzgCloudFileId fileId, Function<ByteString, T> chunkBuilder, public GrpcDownloader(CallStreamObserver<T> callObserver, OzgCloudFileId fileId, Function<ByteString, T> chunkBuilder,
...@@ -88,7 +83,7 @@ public class GrpcDownloader<T> { ...@@ -88,7 +83,7 @@ public class GrpcDownloader<T> {
while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) { while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) {
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
} }
if (hasFinishedDownload()) { if (!downloadInProgress.get()) {
inputStream.close(); inputStream.close();
callObserver.onCompleted(); callObserver.onCompleted();
} }
...@@ -109,10 +104,6 @@ public class GrpcDownloader<T> { ...@@ -109,10 +104,6 @@ public class GrpcDownloader<T> {
} }
private void throwException(IOException e) { private void throwException(IOException e) {
throw new TechnicalException("Error occured during downloading file content download.", e); throw new TechnicalException("Error occurred during downloading file content download.", e);
}
private boolean hasFinishedDownload() throws IOException {
return !downloadInProgress.get() && inputStream.available() == 0;
} }
} }
\ No newline at end of file
...@@ -27,7 +27,10 @@ import static org.assertj.core.api.Assertions.*; ...@@ -27,7 +27,10 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*; import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import java.io.OutputStream;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
...@@ -37,6 +40,9 @@ import org.mockito.InjectMocks; ...@@ -37,6 +40,9 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.test.util.ReflectionTestUtils;
import com.google.protobuf.ByteString;
import de.ozgcloud.collaboration.common.grpc.GrpcDownloader; import de.ozgcloud.collaboration.common.grpc.GrpcDownloader;
import de.ozgcloud.collaboration.vorgang.CollaborationVorgangMapper; import de.ozgcloud.collaboration.vorgang.CollaborationVorgangMapper;
...@@ -188,28 +194,47 @@ class CollaborationGrpcServiceTest { ...@@ -188,28 +194,47 @@ class CollaborationGrpcServiceTest {
void shouldContainCallObserver() { void shouldContainCallObserver() {
var downloader = grpcService.buildDownloader(fileId, callStreamObserver); var downloader = grpcService.buildDownloader(fileId, callStreamObserver);
assertThat(downloader.getCallObserver()).isEqualTo(callStreamObserver); assertThat(getCallObserver(downloader)).isEqualTo(callStreamObserver);
} }
@Test @Test
void shouldContainDownloadConsumer() { void shouldContainDownloadConsumer() {
var downloader = grpcService.buildDownloader(fileId, callStreamObserver); var downloader = grpcService.buildDownloader(fileId, callStreamObserver);
assertThat(downloader.getDownloadConsumer()).isNotNull(); assertThat(getDownloadConsumer(downloader)).isNotNull();
} }
@Test @Test
void shouldContainChunkBuilder() { void shouldContainChunkBuilder() {
var downloader = grpcService.buildDownloader(fileId, callStreamObserver); var downloader = grpcService.buildDownloader(fileId, callStreamObserver);
assertThat(downloader.getChunkBuilder()).isNotNull(); assertThat(getChunkBuilder(downloader)).isNotNull();
} }
@Test @Test
void shouldContainTaskBuilder() { void shouldContainTaskBuilder() {
var downloader = grpcService.buildDownloader(fileId, callStreamObserver); var downloader = grpcService.buildDownloader(fileId, callStreamObserver);
assertThat(downloader.getTaskExecutor()).isEqualTo(taskExecutor); assertThat(getTaskExecuter(downloader)).isEqualTo(taskExecutor);
}
@SuppressWarnings("unchecked")
private CallStreamObserver<GrpcGetFileContentResponse> getCallObserver(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
return (CallStreamObserver<GrpcGetFileContentResponse>) ReflectionTestUtils.getField(downloader, "callObserver");
}
@SuppressWarnings("unchecked")
private Consumer<OutputStream> getDownloadConsumer(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
return (Consumer<OutputStream>) ReflectionTestUtils.getField(downloader, "downloadConsumer");
}
@SuppressWarnings("unchecked")
private Function<ByteString, GrpcGetFileContentResponse> getChunkBuilder(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
return (Function<ByteString, GrpcGetFileContentResponse>) ReflectionTestUtils.getField(downloader, "chunkBuilder");
}
private TaskExecutor getTaskExecuter(GrpcDownloader<GrpcGetFileContentResponse> downloader) {
return (TaskExecutor) ReflectionTestUtils.getField(downloader, "taskExecutor");
} }
} }
} }
\ No newline at end of file
...@@ -247,7 +247,6 @@ class GrpcDownloaderTest { ...@@ -247,7 +247,6 @@ class GrpcDownloaderTest {
@Test @Test
void shouldCallOnCompleted() { void shouldCallOnCompleted() {
ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false)); ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false));
when(inputStream.available()).thenReturn(0);
downloader.doSendChunks(); downloader.doSendChunks();
...@@ -258,7 +257,6 @@ class GrpcDownloaderTest { ...@@ -258,7 +257,6 @@ class GrpcDownloaderTest {
@Test @Test
void shouldCloseInputStream() { void shouldCloseInputStream() {
ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false)); ReflectionTestUtils.setField(downloader, "downloadInProgress", new AtomicBoolean(false));
when(inputStream.available()).thenReturn(0);
downloader.doSendChunks(); downloader.doSendChunks();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment