Skip to content
Snippets Groups Projects
Commit e0ca755f authored by OZGCloud's avatar OZGCloud
Browse files

Merge pull request 'GrpcDownloader in common-lib verschieben' (#32) from...

Merge pull request 'GrpcDownloader in common-lib verschieben' (#32) from create-grpc-downloader into master

Reviewed-on: https://git.ozg-sh.de/ozgcloud-lib/common-lib/pulls/32


Reviewed-by: default avatarOZGCloud <ozgcloud@mgm-tp.com>
parents 3bd0742e ae8fd085
No related branches found
No related tags found
No related merge requests found
...@@ -83,6 +83,11 @@ ...@@ -83,6 +83,11 @@
<artifactId>grpc-stub</artifactId> <artifactId>grpc-stub</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!-- tools --> <!-- tools -->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
......
/*
* Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.IOException;
interface ExceptionalRunnable {
void run() throws IOException; // NOSONAR
}
\ No newline at end of file
/*
* Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
@Log4j2
public class GrpcBinaryFileServerDownloader<T> {
private static final int CHUNK_SIZE = 255 * 1024;
private final CallStreamObserver<T> callObserver;
private final Function<ByteString, T> chunkBuilder;
private final Consumer<OutputStream> downloadConsumer;
private final TaskExecutor taskExecutor;
private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadInProgress = new AtomicBoolean(false);
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
@Builder
public GrpcBinaryFileServerDownloader(CallStreamObserver<T> callObserver, Function<ByteString, T> chunkBuilder,
Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) {
this.callObserver = callObserver;
this.chunkBuilder = chunkBuilder;
this.downloadConsumer = downloadConsumer;
this.taskExecutor = taskExecutor;
}
public void start() {
if (isNotStarted()) {
doStart();
}
}
boolean isNotStarted() {
return started.compareAndSet(false, true);
}
void doStart() {
handleSafety(this::setupStreams);
taskExecutor.execute(Context.current().wrap(this::startDownload));
callObserver.setOnReadyHandler(this::onReadyHandler);
}
void setupStreams() throws IOException {
outputStream = new PipedOutputStream();
inputStream = new PipedInputStream(GrpcBinaryFileServerDownloader.CHUNK_SIZE);
outputStream.connect(inputStream);
}
void startDownload() {
handleSafety(this::doDownload);
}
void doDownload() throws IOException {
downloadInProgress.set(true);
downloadConsumer.accept(outputStream);
downloadInProgress.set(false);
outputStream.close();
}
synchronized void onReadyHandler() {
if (callObserver.isReady()) {
sendChunks();
}
}
void sendChunks() {
handleSafety(this::doSendChunks);
}
void doSendChunks() throws IOException {
int bytesRead;
while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) {
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
}
if (!downloadInProgress.get()) {
inputStream.close();
callObserver.onCompleted();
}
}
void handleSafety(ExceptionalRunnable runnable) {
try {
runnable.run();
} catch (IOException e) {
IOUtils.closeQuietly(inputStream, e1 -> LOG.error("InputStream cannot be closed.", e1));
IOUtils.closeQuietly(outputStream, e1 -> LOG.error("OutputStream cannot be closed.", e1));
throw new TechnicalException("Error occurred during downloading file content download.", e);
}
}
}
\ No newline at end of file
/*
* Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.core.task.TaskExecutor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
class GrpcBinaryFileServerDownloaderTest {
@Mock
private CallStreamObserver<GrpcResponseDummy> callObserver;
@Mock
private Function<ByteString, GrpcResponseDummy> chunkBuilder;
@Mock
private Consumer<OutputStream> downloadConsumer;
@Mock
private TaskExecutor taskExecutor;
private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader;
@BeforeEach
void init() {
downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().callObserver(callObserver).downloadConsumer(downloadConsumer)
.chunkBuilder(chunkBuilder).taskExecutor(taskExecutor).build());
}
@DisplayName("Start")
@Nested
class TestStart {
@BeforeEach
void init() {
doNothing().when(downloader).doStart();
}
@Test
void shouldCallDoStart() {
downloader.start();
verify(downloader).doStart();
}
@Test
void shouldNotStartWhenStarted() {
downloader.start();
downloader.start();
verify(downloader, times(1)).doStart();
}
}
@Nested
class TestIsNotStarted {
@Test
void shouldReturnTrueIfNotStarted() {
var result = downloader.isNotStarted();
assertThat(result).isTrue();
}
@Test
void shouldReturnFalseIfStarted() {
downloader.isNotStarted();
var result = downloader.isNotStarted();
assertThat(result).isFalse();
}
}
@DisplayName("do start")
@Nested
class TestDoStart {
@Mock
private Context callContext;
@Mock
private Runnable wrappedRunnable;
@Captor
private ArgumentCaptor<Runnable> wrappedRunnableCaptor;
@Captor
private ArgumentCaptor<Runnable> runnableCaptor;
@Captor
private ArgumentCaptor<ExceptionalRunnable> setupStreamCaptor;
@SneakyThrows
@Test
void shouldCallSetupStreams() {
doNothing().when(downloader).handleSafety(any());
downloader.doStart();
verify(downloader).handleSafety(setupStreamCaptor.capture());
setupStreamCaptor.getValue().run();
verify(downloader).setupStreams();
}
@Test
void shouldCallTaskExecutor() {
try (var contextMock = mockStatic(Context.class)) {
contextMock.when(Context::current).thenReturn(callContext);
when(callContext.wrap(any(Runnable.class))).thenReturn(wrappedRunnable);
downloader.doStart();
verify(taskExecutor).execute(wrappedRunnable);
}
}
@Test
void shouldCallStartDownload() {
try (var contextMock = mockStatic(Context.class)) {
contextMock.when(Context::current).thenReturn(callContext);
when(callContext.wrap(any(Runnable.class))).thenReturn(wrappedRunnable);
doNothing().when(downloader).startDownload();
downloader.doStart();
verify(callContext).wrap(wrappedRunnableCaptor.capture());
wrappedRunnableCaptor.getValue().run();
verify(downloader).startDownload();
}
}
@SneakyThrows
@Test
void shouldSetOnReadyHandler() {
downloader.doStart();
verify(callObserver).setOnReadyHandler(runnableCaptor.capture());
assertThat(runnableCaptor.getValue()).isNotNull();
}
}
@DisplayName("Start download")
@Nested
class TestStartDownload {
@Mock
private Context callContext;
@Captor
private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
@SneakyThrows
@Test
void shouldCallDoDownload() {
doNothing().when(downloader).handleSafety(any());
doNothing().when(downloader).doDownload();
downloader.startDownload();
verify(downloader).handleSafety(runnableCaptor.capture());
runnableCaptor.getValue().run();
verify(downloader).doDownload();
}
}
@DisplayName("do")
@Nested
class TestDoDownload {
@Mock
private PipedOutputStream outputStream;
@BeforeEach
void mock() {
setOutputStreamField(outputStream);
}
@SneakyThrows
@Test
void shouldCallDownloadConsumer() {
downloader.doDownload();
verify(downloadConsumer).accept(outputStream);
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
downloader.doDownload();
verify(outputStream).close();
}
}
@DisplayName("On ready handler")
@Nested
class TestOnReadyHandler {
@Test
void shouldSendChunksIfCallObserverIsReady() {
when(callObserver.isReady()).thenReturn(true);
doNothing().when(downloader).sendChunks();
downloader.onReadyHandler();
verify(downloader).sendChunks();
}
}
@DisplayName("Send chunks")
@Nested
class TestSendChunks {
@SneakyThrows
@Test
void shouldCallHandleSafety() {
doNothing().when(downloader).doSendChunks();
downloader.sendChunks();
verify(downloader).handleSafety(any(ExceptionalRunnable.class));
}
@SneakyThrows
@Test
void shouldCallDoDownoad() {
doNothing().when(downloader).doSendChunks();
downloader.sendChunks();
verify(downloader).doSendChunks();
}
@DisplayName("do")
@Nested
class TestDoSendChunks {
@Mock
private PipedInputStream inputStream;
private final int data = 20;
@BeforeEach
void mock() {
setInputStreamField(inputStream);
}
@SneakyThrows
@DisplayName("should send next chunk if callObserver is ready and stream already received data")
@Test
void shouldCallOnNext() {
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(data, -1);
downloader.doSendChunks();
verify(callObserver).onNext(any());
}
@SneakyThrows
@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@Test
void shouldCallOnCompleted() {
setDownloadInProgressField(new AtomicBoolean(false));
downloader.doSendChunks();
verify(callObserver).onCompleted();
}
@SneakyThrows
@Test
void shouldCloseInputStream() {
setDownloadInProgressField(new AtomicBoolean(false));
downloader.doSendChunks();
verify(inputStream).close();
}
}
}
@DisplayName("Handle safety")
@Nested
class TestHandleSafety {
@DisplayName("on exception")
@Nested
class TestOnException {
@Mock
private PipedOutputStream outputStream;
@Mock
private PipedInputStream inputStream;
private final IOException exception = new IOException();
@SneakyThrows
@BeforeEach
void mock() {
setInputStreamField(inputStream);
setOutputStreamField(outputStream);
}
@SneakyThrows
@Test
void shouldThrowTechnicalException() {
assertThatThrownBy(this::handleSafety).isInstanceOf(TechnicalException.class).extracting(Throwable::getCause).isEqualTo(exception);
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
try {
handleSafety();
} catch (Exception e) {
// do nothing
}
verify(outputStream).close();
}
@SneakyThrows
@Test
void shouldCloseInputStream() {
try {
handleSafety();
} catch (Exception e) {
// do nothing
}
verify(inputStream).close();
}
private void handleSafety() {
downloader.handleSafety(this::dummyMethodThrowingException);
}
private void dummyMethodThrowingException() throws IOException {
throw exception;
}
}
}
@SneakyThrows
private void setOutputStreamField(OutputStream outputStream) {
var outputStreamField = downloader.getClass().getDeclaredField("outputStream");
outputStreamField.setAccessible(true);
outputStreamField.set(downloader, outputStream);
}
@SneakyThrows
private void setInputStreamField(InputStream inputStream) {
var inputStreamField = downloader.getClass().getDeclaredField("inputStream");
inputStreamField.setAccessible(true);
inputStreamField.set(downloader, inputStream);
}
@SneakyThrows
private void setDownloadInProgressField(AtomicBoolean downloadInProgress) {
var downloadInProgressField = downloader.getClass().getDeclaredField("downloadInProgress");
downloadInProgressField.setAccessible(true);
downloadInProgressField.set(downloader, downloadInProgress);
}
static class GrpcResponseDummy {
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment