Skip to content
Snippets Groups Projects
Commit 96621d6b authored by OZGCloud's avatar OZGCloud
Browse files

Merge pull request 'OZG-7143 GrpcBinaryFileServerDownloader läuft ins Timeout'...

Merge pull request 'OZG-7143 GrpcBinaryFileServerDownloader läuft ins Timeout' (#37) from OZG-7143-download-timeout into master

Reviewed-on: https://git.ozg-sh.de/ozgcloud-lib/common-lib/pulls/37


Reviewed-by: default avatarOZGCloud <ozgcloud@mgm-tp.com>
parents 65647fb4 58a8df02
Branches
Tags
No related merge requests found
......@@ -2,7 +2,9 @@
## Changelog
### 4.6.0-SNAPSHOT
### 4.7.0-SNAPSHOT
### 4.6.0
* Update [OZGCloud License Generator](ozgcloud-common-license/readme.md)
* `GrpcUtil` erweitern: mehr Schlüssel hinzufügen und Methoden, um diese Schlüssel aus gRPC-Metadaten zu extrahieren.
......
......@@ -145,6 +145,11 @@
</dependency>
<!-- test -->
<dependency>
<groupId>de.ozgcloud.common</groupId>
<artifactId>ozgcloud-common-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
......
......@@ -51,7 +51,8 @@ public class GrpcBinaryFileServerDownloader<T> {
private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadInProgress = new AtomicBoolean(false);
private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
private final AtomicBoolean requestFinished = new AtomicBoolean(false);
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
......@@ -76,9 +77,10 @@ public class GrpcBinaryFileServerDownloader<T> {
}
void doStart() {
LOG.debug("Starting download.");
handleSafety(this::setupStreams);
taskExecutor.execute(this::startDownload);
callObserver.setOnReadyHandler(this::onReadyHandler);
callObserver.setOnReadyHandler(this::sendChunks);
}
void setupStreams() throws IOException {
......@@ -91,41 +93,66 @@ public class GrpcBinaryFileServerDownloader<T> {
handleSafety(this::doDownload);
}
void doDownload() throws IOException {
downloadInProgress.set(true);
void doDownload() {
LOG.debug("Downloading file content...");
downloadConsumer.accept(outputStream);
downloadInProgress.set(false);
outputStream.close();
}
synchronized void onReadyHandler() {
if (callObserver.isReady()) {
LOG.debug("Download completed.");
downloadFinished.set(true);
closeOutputStream();
sendChunks();
}
}
void sendChunks() {
synchronized void sendChunks() {
handleSafety(this::doSendChunks);
}
void doSendChunks() throws IOException {
if (requestFinished.get()) {
return;
}
int bytesRead;
while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) {
while (callObserver.isReady()) {
if ((bytesRead = inputStream.read(buffer)) == -1) {
tryCompleteRequest();
break;
}
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
LOG.debug("Sent {} bytes", bytesRead);
}
}
void tryCompleteRequest() {
if (shouldCompleteRequest()) {
completeRequest();
}
if (!downloadInProgress.get()) {
inputStream.close();
callObserver.onCompleted();
}
boolean shouldCompleteRequest() {
return downloadFinished.get() && requestFinished.compareAndSet(false, true);
}
void completeRequest() {
LOG.debug("Complete download request");
closeInputStream();
callObserver.onCompleted();
}
void handleSafety(ExceptionalRunnable runnable) {
try {
runnable.run();
} catch (Exception e) {
IOUtils.closeQuietly(inputStream, e1 -> LOG.error("InputStream cannot be closed.", e1));
IOUtils.closeQuietly(outputStream, e1 -> LOG.error("OutputStream cannot be closed.", e1));
closeOutputStream();
closeInputStream();
throw new TechnicalException("Error occurred during downloading file content download.", e);
}
}
void closeOutputStream() {
IOUtils.closeQuietly(outputStream, e -> LOG.error("OutputStream cannot be closed.", e));
}
void closeInputStream() {
IOUtils.closeQuietly(inputStream, e -> LOG.error("InputStream cannot be closed.", e));
}
}
\ No newline at end of file
......@@ -25,6 +25,7 @@ package de.ozgcloud.common.binaryfile;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.common.test.ReflectionTestUtils;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.SneakyThrows;
......@@ -42,6 +43,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -161,9 +163,6 @@ class GrpcBinaryFileServerDownloaderTest {
@Nested
class TestStartDownload {
@Mock
private Context callContext;
@Captor
private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
......@@ -210,21 +209,6 @@ class GrpcBinaryFileServerDownloaderTest {
}
}
@DisplayName("On ready handler")
@Nested
class TestOnReadyHandler {
@Test
void shouldSendChunksIfCallObserverIsReady() {
when(callObserver.isReady()).thenReturn(true);
doNothing().when(downloader).sendChunks();
downloader.onReadyHandler();
verify(downloader).sendChunks();
}
}
@DisplayName("Send chunks")
@Nested
class TestSendChunks {
......@@ -241,7 +225,7 @@ class GrpcBinaryFileServerDownloaderTest {
@SneakyThrows
@Test
void shouldCallDoDownoad() {
void shouldCallDoDownload() {
doNothing().when(downloader).doSendChunks();
downloader.sendChunks();
......@@ -255,46 +239,178 @@ class GrpcBinaryFileServerDownloaderTest {
@Mock
private PipedInputStream inputStream;
@Captor
private ArgumentCaptor<ByteString> byteStringCaptor;
private final int data = 20;
private final int readBytes = 20;
private final byte[] buffer = new byte[readBytes];
private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
@SneakyThrows
@BeforeEach
void mock() {
doNothing().when(downloader).tryCompleteRequest();
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(readBytes, -1);
setInputStreamField(inputStream);
new Random().nextBytes(buffer);
ReflectionTestUtils.setField(downloader, "buffer", buffer);
}
@Test
void shouldCallChunkBuilder() {
doSendChunks();
verify(chunkBuilder).apply(byteStringCaptor.capture());
assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
}
@SneakyThrows
@DisplayName("should send next chunk if callObserver is ready and stream already received data")
@Test
void shouldCallOnNext() {
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(data, -1);
when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
downloader.doSendChunks();
doSendChunks();
verify(callObserver).onNext(any());
verify(callObserver).onNext(grpcResponseDummy);
}
@SneakyThrows
@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@Test
void shouldCallOnCompleted() {
setDownloadInProgressField(new AtomicBoolean(false));
void shouldCallCompleteDownload() {
setDownloadFinishedField(true);
doSendChunks();
verify(downloader).tryCompleteRequest();
}
@SneakyThrows
private void doSendChunks() {
downloader.doSendChunks();
}
}
}
verify(callObserver).onCompleted();
@Nested
class TestTryCompleteRequest {
@Test
void shouldCallShouldCompleteRequest() {
downloader.tryCompleteRequest();
verify(downloader).shouldCompleteRequest();
}
@Test
void shouldCallCompleteRequest() {
doReturn(true).when(downloader).shouldCompleteRequest();
downloader.tryCompleteRequest();
verify(downloader).completeRequest();
}
@Test
void shouldNotCallCompleteRequest() {
doReturn(false).when(downloader).shouldCompleteRequest();
downloader.tryCompleteRequest();
verify(downloader, never()).completeRequest();
}
}
@Nested
class TestShouldCompleteRequest {
@Nested
class TestWhenDownloadFinished {
@BeforeEach
void init() {
setDownloadFinishedField(true);
}
@Test
void shouldReturnTrue() {
var result = downloader.shouldCompleteRequest();
assertThat(result).isTrue();
}
@Test
void shouldReturnFalseIfRequestFinished() {
setRequestFinishedField(true);
var result = downloader.shouldCompleteRequest();
assertThat(result).isFalse();
}
@Test
void shouldUpdateRequestFinished() {
downloader.shouldCompleteRequest();
assertThat(getRequestFinished()).isTrue();
}
}
@Nested
class TestWhenDownloadRunning {
@BeforeEach
void init() {
setDownloadFinishedField(false);
}
@Test
void shouldReturnFalse() {
var result = downloader.shouldCompleteRequest();
assertThat(result).isFalse();
}
@Test
void shouldNotUpdateRequestFinished() {
downloader.shouldCompleteRequest();
assertThat(getRequestFinished()).isFalse();
}
}
}
@Nested
class TestCompleteRequest {
@Mock
private PipedInputStream inputStream;
@BeforeEach
void mock() {
setRequestFinishedField(false);
setDownloadFinishedField(true);
setInputStreamField(inputStream);
}
@SneakyThrows
@Test
void shouldCloseInputStream() {
setDownloadInProgressField(new AtomicBoolean(false));
void shouldCallCloseInputStream() {
downloader.completeRequest();
downloader.doSendChunks();
verify(downloader).closeInputStream();
}
verify(inputStream).close();
@Test
void shouldCallOnCompleted() {
downloader.completeRequest();
verify(callObserver).onCompleted();
}
@SneakyThrows
private boolean getRequestFinished() {
return ReflectionTestUtils.getField(downloader, "requestFinished", AtomicBoolean.class).get();
}
}
......@@ -361,25 +477,24 @@ class GrpcBinaryFileServerDownloaderTest {
}
@SneakyThrows
private void setOutputStreamField(OutputStream outputStream) {
var outputStreamField = downloader.getClass().getDeclaredField("outputStream");
outputStreamField.setAccessible(true);
outputStreamField.set(downloader, outputStream);
ReflectionTestUtils.setField(downloader, "outputStream", outputStream);
}
@SneakyThrows
private void setInputStreamField(InputStream inputStream) {
var inputStreamField = downloader.getClass().getDeclaredField("inputStream");
inputStreamField.setAccessible(true);
inputStreamField.set(downloader, inputStream);
ReflectionTestUtils.setField(downloader, "inputStream", inputStream);
}
@SneakyThrows
private void setDownloadInProgressField(AtomicBoolean downloadInProgress) {
var downloadInProgressField = downloader.getClass().getDeclaredField("downloadInProgress");
downloadInProgressField.setAccessible(true);
downloadInProgressField.set(downloader, downloadInProgress);
private void setDownloadFinishedField(boolean downloadFinished) {
ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
}
private void setRequestFinishedField(boolean requestFinished) {
ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
}
private boolean getRequestFinished() {
return ReflectionTestUtils.getField(downloader, "requestFinished", AtomicBoolean.class).get();
}
static class GrpcResponseDummy {
......
/*
* Copyright (C) 2024 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.test;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import java.lang.reflect.Field;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ReflectionTestUtils {
@SneakyThrows
public static <T> T getField(Object target, String fieldName, Class<T> fieldType) {
return fieldType.cast(getDeclaredField(target, fieldName).get(target));
}
@SneakyThrows
public static void setField(Object target, String fieldName, Object value) {
getDeclaredField(target, fieldName).set(target, value); // NOSONAR
}
@SneakyThrows
private static Field getDeclaredField(Object target, String fieldName) {
var field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true); // NOSONAR
return field;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment