Skip to content
Snippets Groups Projects
Commit be8b9fb5 authored by OZGCloud's avatar OZGCloud
Browse files

OZG-6161 use GrpcDownloader frpm common lib

parent f61245d7
Branches
Tags
No related merge requests found
......@@ -54,6 +54,10 @@
<groupId>de.ozgcloud.api-lib</groupId>
<artifactId>api-lib-core</artifactId>
</dependency>
<dependency>
<groupId>de.ozgcloud.common</groupId>
<artifactId>ozgcloud-common-lib</artifactId>
</dependency>
<dependency>
<groupId>de.ozgcloud.api-lib</groupId>
<artifactId>api-lib-core</artifactId>
......
package de.ozgcloud.archive.export;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import com.google.protobuf.ByteString;
......@@ -16,8 +11,8 @@ import de.ozgcloud.archive.grpc.export.ExportServiceGrpc.ExportServiceImplBase;
import de.ozgcloud.archive.grpc.export.GrpcExportVorgangRequest;
import de.ozgcloud.archive.grpc.export.GrpcExportVorgangResponse;
import de.ozgcloud.archive.grpc.export.GrpcFile;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import net.devh.boot.grpc.server.service.GrpcService;
......@@ -31,64 +26,42 @@ class ExportGrpcService extends ExportServiceImplBase {
static final int CHUNK_SIZE = 256 * 1024;
private final ExportService exportService;
private final TaskExecutor taskExecutor;
@Override
public void exportVorgang(GrpcExportVorgangRequest request, StreamObserver<GrpcExportVorgangResponse> responseObserver) {
try (var pipedInputStream = new PipedInputStream();
var pipedOutputStream = new PipedOutputStream(pipedInputStream)) {
var fileNameId = createFileNameId();
writeExportData(request.getVorgangId(), fileNameId, pipedOutputStream);
sendExportData(responseObserver, pipedInputStream, buildXdomeaFileName(fileNameId));
} catch (IOException e) {
throw new TechnicalException("Error on exporting Vorgang!", e);
}
sendFileName(responseObserver, fileNameId);
buildExportDownloader(request.getVorgangId(), fileNameId, responseObserver).start();
}
String createFileNameId() {
return UUID.randomUUID().toString();
}
void writeExportData(String vorgangId, String fileNameId, PipedOutputStream pipedOutputStream) {
// The grpcContext has to be passed to the new Thread to preserve the grpc
// header information from the incoming request.
var grpcContext = Context.current();
new Thread(grpcContext.wrap(() -> exportService.writeXdomeaFileContent(vorgangId, fileNameId, pipedOutputStream))).start();
void sendFileName(StreamObserver<GrpcExportVorgangResponse> responseObserver, String fileNameId) {
responseObserver.onNext(GrpcExportVorgangResponse.newBuilder()
.setVorgangFile(GrpcFile.newBuilder().setFileName(buildXdomeaFileName(fileNameId)).build())
.build());
}
String buildXdomeaFileName(String fileNameId) {
return String.format(EXPORT_FILENAME_TEMPLATE, fileNameId);
}
void sendExportData(StreamObserver<GrpcExportVorgangResponse> responseObserver, InputStream fileContent, String fileName) {
sendFileName(responseObserver, fileName);
var fileChunk = new byte[CHUNK_SIZE];
int length;
try (var bufferedInputStream = createBufferedInputStream(fileContent)) {
while ((length = bufferedInputStream.read(fileChunk)) != -1) {
sendChunk(responseObserver, fileChunk, length);
}
responseObserver.onCompleted();
} catch (IOException e) {
handleException(fileContent, e, "Error on sending file!");
}
}
private void sendFileName(StreamObserver<GrpcExportVorgangResponse> responseObserver, String fileName) {
responseObserver.onNext(GrpcExportVorgangResponse.newBuilder().setVorgangFile(GrpcFile.newBuilder().setFileName(fileName).build()).build());
}
InputStream createBufferedInputStream(InputStream fileContent) {
return new BufferedInputStream(fileContent, CHUNK_SIZE);
}
private void sendChunk(StreamObserver<GrpcExportVorgangResponse> responseObserver, byte[] fileChunk, int length) {
responseObserver.onNext(GrpcExportVorgangResponse.newBuilder()
.setVorgangFile(GrpcFile.newBuilder().setFileContent(ByteString.copyFrom(fileChunk, 0, length)).build())
.build());
GrpcBinaryFileServerDownloader<GrpcExportVorgangResponse> buildExportDownloader(String vorgangId, String fileNameId,
StreamObserver<GrpcExportVorgangResponse> responseObserver) {
return GrpcBinaryFileServerDownloader.<GrpcExportVorgangResponse>builder()
.callObserver((CallStreamObserver<GrpcExportVorgangResponse>) responseObserver)
.taskExecutor(taskExecutor)
.downloadConsumer(outputStream -> exportService.writeXdomeaFileContent(vorgangId, fileNameId, outputStream))
.chunkBuilder(this::buildExportVorgangChunkResponse)
.build();
}
private void handleException(InputStream inputStream, IOException e, String message) {
IOUtils.closeQuietly(inputStream);
throw new TechnicalException(message, e);
GrpcExportVorgangResponse buildExportVorgangChunkResponse(ByteString chunk) {
return GrpcExportVorgangResponse.newBuilder()
.setVorgangFile(GrpcFile.newBuilder().setFileContent(chunk).build())
.build();
}
}
package de.ozgcloud.archive.export;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.OutputStream;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
......@@ -21,19 +16,19 @@ import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Spy;
import org.springframework.core.task.TaskExecutor;
import com.google.protobuf.ByteString;
import com.thedeanda.lorem.LoremIpsum;
import de.ozgcloud.archive.grpc.export.GrpcExportVorgangRequest;
import de.ozgcloud.archive.grpc.export.GrpcExportVorgangResponse;
import de.ozgcloud.archive.grpc.export.GrpcFile;
import de.ozgcloud.archive.vorgang.VorgangWithEingangTestFactory;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
class ExportGrpcServiceTest {
......@@ -42,70 +37,24 @@ class ExportGrpcServiceTest {
private ExportGrpcService service;
@Mock
private ExportService exportService;
@Mock
private TaskExecutor taskExecutor;
@Nested
class TestExportVorgang {
@Mock
private StreamObserver<GrpcExportVorgangResponse> responseObserver;
@Mock
private GrpcBinaryFileServerDownloader<GrpcExportVorgangResponse> downloader;
private final GrpcExportVorgangRequest request = GrpcExportVorgangRequestTestFactory.create();
private final String fileNameId = UUID.randomUUID().toString();
private final String fileName = LoremIpsum.getInstance().getName();
@BeforeEach
void setUpMocks() {
doReturn(fileNameId).when(service).createFileNameId();
doReturn(fileName).when(service).buildXdomeaFileName(fileNameId);
}
@Nested
class OnNoException {
private PipedInputStream pipedInputStream;
private MockedConstruction<PipedInputStream> mockedConstructionInput;
private PipedOutputStream pipedOutputStream;
private MockedConstruction<PipedOutputStream> mockedConstructionOutput;
private InputStream connectedInput;
@BeforeEach
void setUpMockedConstruction() {
mockedConstructionInput = mockConstruction(PipedInputStream.class, (mock, context) -> {
pipedInputStream = mock;
doNothing().when(service).sendExportData(responseObserver, pipedInputStream, fileName);
});
mockedConstructionOutput = mockConstruction(PipedOutputStream.class, (mock, context) -> {
pipedOutputStream = mock;
connectedInput = (InputStream) context.arguments().get(0);
});
}
@AfterEach
void closeMocks() {
mockedConstructionInput.close();
mockedConstructionOutput.close();
}
@Test
void shouldConstructInputStream() {
callService();
assertThat(mockedConstructionInput.constructed()).hasSize(1);
}
@Test
void shouldConstructOutputStream() {
callService();
assertThat(mockedConstructionOutput.constructed()).hasSize(1);
}
@Test
void shouldConnectPipes() {
callService();
assertThat(connectedInput).isEqualTo(pipedInputStream);
doReturn(downloader).when(service).buildExportDownloader(VorgangWithEingangTestFactory.ID, fileNameId, responseObserver);
}
@Test
......@@ -116,77 +65,29 @@ class ExportGrpcServiceTest {
}
@Test
void shouldWriteExportData() {
callService();
verify(service).writeExportData(VorgangWithEingangTestFactory.ID, fileNameId, pipedOutputStream);
}
@Test
void shouldBuildXdomeaFileName() {
callService();
verify(service).buildXdomeaFileName(fileNameId);
}
@Test
void shouldSendExportData() {
void shouldSendFileName() {
callService();
verify(service).sendExportData(responseObserver, pipedInputStream, fileName);
verify(service).sendFileName(responseObserver, fileNameId);
}
@Test
@SneakyThrows
void shouldCloseOutputStream() {
void shouldBuildExportDownloader() {
callService();
verify(pipedOutputStream).close();
verify(service).buildExportDownloader(VorgangWithEingangTestFactory.ID, fileNameId, responseObserver);
}
@Test
@SneakyThrows
void shouldCloseInputStream() {
void shouldStartDownloader() {
callService();
verify(pipedInputStream).close();
}
}
@Nested
class OnIOException {
@BeforeEach
void setUpMock() {
doNothing().when(service).sendExportData(eq(responseObserver), any(PipedInputStream.class), eq(fileName));
}
@Test
@SneakyThrows
void shouldThrowTechnicalExceptionOnErrorOnInputStream() {
try (var mockedConstructionInput = mockConstruction(PipedInputStream.class,
(mock, context) -> doThrow(IOException.class).when(mock).close())) {
assertThrows(TechnicalException.class, () -> callService());
}
}
@Test
@SneakyThrows
void shouldThrowTechnicalExceptionOnErrorOnOutputStream() {
try (var mockedConstructionInput = mockConstruction(PipedOutputStream.class,
(mock, context) -> doThrow(IOException.class).when(mock).close())) {
assertThrows(TechnicalException.class, () -> callService());
}
}
verify(downloader).start();
}
private void callService() {
service.exportVorgang(request, responseObserver);
}
}
@Nested
......@@ -207,271 +108,145 @@ class ExportGrpcServiceTest {
}
@Nested
class TestWriteExportData {
private MockedConstruction<Thread> mockedConstructionThread;
private Runnable passedRunnable;
private Thread thread;
private final String fileName = LoremIpsum.getInstance().getName();
@Mock
private PipedOutputStream pipedOutputStream;
@BeforeEach
void setUpMock() {
mockedConstructionThread = mockConstruction(Thread.class, (mock, context) -> {
passedRunnable = (Runnable) context.arguments().get(0);
thread = mock;
});
}
@AfterEach
void closeMock() {
mockedConstructionThread.close();
}
@Test
void shouldConstructNewThread() {
callService();
assertThat(mockedConstructionThread.constructed()).hasSize(1);
}
@Test
void shouldStartThread() {
callService();
verify(thread).start();
}
@Nested
class TestRunnable {
class TestSendFileName {
@Mock
private Context grpcContext;
private StreamObserver<GrpcExportVorgangResponse> responseObserver;
@Mock
private Runnable wrapedRunnable;
private GrpcBinaryFileServerDownloader<GrpcExportVorgangResponse> downloader;
@Test
void shouldCallExportService() {
callService();
private final String fileNameId = UUID.randomUUID().toString();
private final String fileName = LoremIpsum.getInstance().getName();
passedRunnable.run();
@Test
void shouldCallBuildXdomeaFileName() {
sendFileName();
verify(exportService).writeXdomeaFileContent(VorgangWithEingangTestFactory.ID, fileName, pipedOutputStream);
verify(service).buildXdomeaFileName(fileNameId);
}
@Test
void shouldPassWrappedRunnableInGrpcContext() {
try (var mockedStaticContext = mockStatic(Context.class)) {
mockedStaticContext.when(Context::current).thenReturn(grpcContext);
when(grpcContext.wrap(any(Runnable.class))).thenReturn(wrapedRunnable);
callService();
assertThat(passedRunnable).isEqualTo(wrapedRunnable);
}
void shouldSendFileName() {
doReturn(fileName).when(service).buildXdomeaFileName(fileNameId);
}
}
sendFileName();
private void callService() {
service.writeExportData(VorgangWithEingangTestFactory.ID, fileName, pipedOutputStream);
}
verify(responseObserver).onNext(argThat((response) -> response.getVorgangFile().getFileName().equals(fileName)));
}
@Nested
class TestBuildXdomeaFileName {
private final String id = UUID.randomUUID().toString();
@Test
void shouldReturnFileName() {
var fileName = service.buildXdomeaFileName(id);
assertThat(fileName).isEqualTo(id + "_Abgabe.Abgabe.0401.xdomea");
private void sendFileName() {
service.sendFileName(responseObserver, fileNameId);
}
}
@Nested
class TestSendExportData {
class TestBuildExportDownloader {
@Mock
private StreamObserver<GrpcExportVorgangResponse> responseObserver;
private CallStreamObserver<GrpcExportVorgangResponse> responseObserver;
private final String fileNameId = UUID.randomUUID().toString();
@Mock
private InputStream xdomeaFileContent;
private OutputStream outputStream;
private final String fileName = LoremIpsum.getInstance().getName();
private MockedConstruction<GrpcBinaryFileServerDownloader> downloaderMockedConstruction; // NOSONAR
private GrpcBinaryFileServerDownloader<GrpcExportVorgangResponse> downloader;
private StreamObserver<GrpcExportVorgangResponse> setResponseObserver;
private TaskExecutor setTaskExecutor;
private Consumer<OutputStream> setDownloadConsumer;
private Function<ByteString, GrpcExportVorgangResponse> setChunkBuilder;
@Nested
class TestWithByteArrayInputStream {
private final byte[] content = FileContentTestFactory.createContentInByte((int) (ExportGrpcService.CHUNK_SIZE * 1.5));
private final InputStream inputStream = new ByteArrayInputStream(content);
private final ByteString chunk = ByteString.copyFromUtf8(LoremIpsum.getInstance().getWords(5));
@SuppressWarnings("unchecked")
@BeforeEach
void setUpMock() {
doReturn(inputStream).when(service).createBufferedInputStream(xdomeaFileContent);
void mock() {
downloaderMockedConstruction = mockConstruction(GrpcBinaryFileServerDownloader.class, (downloader, context) -> {
setResponseObserver = (StreamObserver<GrpcExportVorgangResponse>) context.arguments().get(0);
setChunkBuilder = (Function<ByteString, GrpcExportVorgangResponse>) context.arguments().get(1);
setDownloadConsumer = (Consumer<OutputStream>) context.arguments().get(2);
setTaskExecutor = (TaskExecutor) context.arguments().get(3);
this.downloader = downloader;
});
}
@Test
void shouldSendFileName() {
callService();
verify(responseObserver).onNext(argThat((response) -> response.getVorgangFile().getFileName().equals(fileName)));
@AfterEach
void closeMock() {
downloaderMockedConstruction.close();
}
@Test
void shouldCreateBufferedInputStream() {
callService();
void shouldSetResponseObserver() {
buildExportDownloader();
verify(service).createBufferedInputStream(xdomeaFileContent);
assertThat(setResponseObserver).isEqualTo(responseObserver);
}
@Test
void shouldSendFirstDataChunk() {
callService();
void shouldSetTaskExecutor() {
buildExportDownloader();
verify(responseObserver)
.onNext(argThat((response) -> response.getVorgangFile().getFileContent()
.equals(ByteString.copyFrom(content, 0, ExportGrpcService.CHUNK_SIZE))));
assertThat(setTaskExecutor).isEqualTo(taskExecutor);
}
@Test
void shouldSendSecondDataChunk() {
callService();
void shouldSetDownloadConsumer() {
buildExportDownloader();
verify(responseObserver)
.onNext(argThat((response) -> response.getVorgangFile().getFileContent()
.equals(ByteString.copyFrom(content, ExportGrpcService.CHUNK_SIZE,
content.length - ExportGrpcService.CHUNK_SIZE))));
}
@Test
void shouldComplete() {
callService();
setDownloadConsumer.accept(outputStream);
verify(responseObserver).onCompleted();
}
}
@Nested
class TestWithMockedInputStream {
@Mock
private BufferedInputStream inputStream;
@BeforeEach
void setUpMock() {
doReturn(inputStream).when(service).createBufferedInputStream(xdomeaFileContent);
verify(exportService).writeXdomeaFileContent(VorgangWithEingangTestFactory.ID, fileNameId, outputStream);
}
@Test
@SneakyThrows
void shouldCloseInputStream() {
when(inputStream.read(any())).thenReturn(-1);
callService();
verify(inputStream).close();
}
@Nested
class OnIOException {
private MockedStatic<IOUtils> mockedIOUtils;
void shouldSetChunkBuilder() {
var response = GrpcExportVorgangResponse.newBuilder().setVorgangFile(GrpcFile.newBuilder().setFileContent(chunk).build()).build();
doReturn(response).when(service).buildExportVorgangChunkResponse(chunk);
buildExportDownloader();
@BeforeEach
@SneakyThrows
void setUpMock() {
when(inputStream.read(any())).thenThrow(new IOException());
mockedIOUtils = mockStatic(IOUtils.class);
}
var result = setChunkBuilder.apply(chunk);
@AfterEach
void cleanUp() {
mockedIOUtils.close();
assertThat(result).isEqualTo(response);
}
@Test
void shouldThrowTechnicalException() {
assertThrows(TechnicalException.class,
() -> callService());
}
void shouldReturnDownloader() {
var returnedDownloader = buildExportDownloader();
@Test
void shouldCloseFileContentStreamQuietly() {
try {
callService();
} catch (TechnicalException e) {
assertThat(returnedDownloader).isEqualTo(downloader);
}
mockedIOUtils.verify(() -> IOUtils.closeQuietly(xdomeaFileContent));
}
}
}
private void callService() {
service.sendExportData(responseObserver, xdomeaFileContent, fileName);
private GrpcBinaryFileServerDownloader<GrpcExportVorgangResponse> buildExportDownloader() {
return service.buildExportDownloader(VorgangWithEingangTestFactory.ID, fileNameId, responseObserver);
}
}
@Nested
class TestCreateBufferedInputStream {
private MockedConstruction<BufferedInputStream> mockConstructionBufferedInputStream;
private InputStream passedInputStream;
private BufferedInputStream constructedInputStream;
private int chunkSize;
@Mock
private InputStream fileContent;
@BeforeEach
void setUpMock() {
mockConstructionBufferedInputStream = mockConstruction(BufferedInputStream.class, (mock, context) -> {
passedInputStream = (InputStream) context.arguments().get(0);
chunkSize = (int) context.arguments().get(1);
constructedInputStream = mock;
});
}
class TestBuildExportVorgangChunkResponse {
@AfterEach
void closeMock() {
mockConstructionBufferedInputStream.close();
}
private final ByteString chunk = ByteString.copyFromUtf8(LoremIpsum.getInstance().getWords(5));
@Test
void shouldConstructBufferedInputStream() {
callService();
void shouldReturnResponse() {
var expectedResponse = GrpcExportVorgangResponse.newBuilder().setVorgangFile(GrpcFile.newBuilder().setFileContent(chunk).build()).build();
assertThat(mockConstructionBufferedInputStream.constructed()).hasSize(1);
}
var result = service.buildExportVorgangChunkResponse(chunk);
@Test
void shouldConstructBufferedInputStreamWithFileContent() {
callService();
assertThat(result).isEqualTo(expectedResponse);
assertThat(passedInputStream).isEqualTo(fileContent);
}
}
@Test
void shouldConstructBufferedInputStreamWithChunkSize() {
callService();
@Nested
class TestBuildXdomeaFileName {
assertThat(chunkSize).isEqualTo(ExportGrpcService.CHUNK_SIZE);
}
private final String id = UUID.randomUUID().toString();
@Test
void shouldReturnBufferedInputStream() {
var returnedInputStream = callService();
assertThat(returnedInputStream).isEqualTo(constructedInputStream);
}
void shouldReturnFileName() {
var fileName = service.buildXdomeaFileName(id);
private InputStream callService() {
return service.createBufferedInputStream(fileContent);
assertThat(fileName).isEqualTo(id + "_Abgabe.Abgabe.0401.xdomea");
}
}
}
......@@ -4,6 +4,7 @@ import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import net.devh.boot.grpc.client.autoconfigure.GrpcClientAutoConfiguration;
import net.devh.boot.grpc.server.autoconfigure.GrpcServerAutoConfiguration;
......@@ -19,7 +20,7 @@ import net.devh.boot.grpc.server.autoconfigure.GrpcServerFactoryAutoConfiguratio
public class GrpcIntegrationTestConfiguration {
@Bean
ExportGrpcService grpcExportService(ExportService exportService) {
return new ExportGrpcService(exportService);
ExportGrpcService grpcExportService(ExportService exportService, TaskExecutor taskExecutor) {
return new ExportGrpcService(exportService, taskExecutor);
}
}
......@@ -30,6 +30,7 @@
<api-lib.version>0.11.0</api-lib.version>
<find-and-replace-maven-plugin.version>1.2.0</find-and-replace-maven-plugin.version>
<protoc-jar-plugin.version>3.11.4</protoc-jar-plugin.version>
<ozgcloud-common.version>4.5.0-SNAPSHOT</ozgcloud-common.version>
</properties>
<dependencyManagement>
<dependencies>
......@@ -49,6 +50,11 @@
<artifactId>api-lib-core</artifactId>
<version>${api-lib.version}</version>
</dependency>
<dependency>
<groupId>de.ozgcloud.common</groupId>
<artifactId>ozgcloud-common-lib</artifactId>
<version>${ozgcloud-common.version}</version>
</dependency>
<dependency>
<groupId>de.ozgcloud.api-lib</groupId>
<artifactId>api-lib-core</artifactId>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment