Skip to content
Snippets Groups Projects
Commit d677219c authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

Merge branch 'OZG-7262-fix-unfinished-downloads' into 'main'

Ozg 7262 fix unfinished downloads

See merge request !8
parents aa16948f c10e6482
Branches
Tags
1 merge request!8Ozg 7262 fix unfinished downloads
......@@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2;
@Log4j2
public class GrpcBinaryFileServerDownloader<T> {
private static final int CHUNK_SIZE = 255 * 1024;
static final int CHUNK_SIZE = 255 * 1024;
static final int END_OF_STREAM = -1;
static final int NOTHING_READ = 0;
private final CallStreamObserver<T> callObserver;
private final Function<ByteString, T> chunkBuilder;
......@@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> {
private final byte[] buffer = new byte[CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
private final AtomicBoolean requestFinished = new AtomicBoolean(false);
private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>();
......@@ -118,56 +120,61 @@ public class GrpcBinaryFileServerDownloader<T> {
LOG.debug("Downloading file content...");
downloadConsumer.accept(outputStream);
LOG.debug("Download completed.");
downloadFinished.set(true);
}
synchronized void sendChunks() {
try {
doSendChunks();
} catch (Exception e) {
completeRequestWithError(new TechnicalException("Error while sending chunks", e));
handleError(new TechnicalException("Error while sending chunks", e));
}
}
void doSendChunks() throws IOException {
if (requestFinished.get()) {
return;
while (canSendChunks()) {
processDataFromInputStream();
}
int bytesRead;
while (isReady()) {
if ((bytesRead = inputStream.read(buffer)) == -1) {
tryCompleteRequest();
}
boolean canSendChunks() {
return !requestFinished.get() && callObserver.isReady();
}
void processDataFromInputStream() throws IOException {
var bytesRead = inputStream.read(buffer);
switch (bytesRead) {
case END_OF_STREAM:
finishProcessing();
break;
}
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
LOG.debug("Sent {} bytes", bytesRead);
case NOTHING_READ:
break;
default:
sendBytesToCallObserver(bytesRead);
}
}
private boolean isReady() {
return callObserver.isReady();
void sendBytesToCallObserver(int bytesRead) {
var bytes = ByteString.copyFrom(buffer, 0, bytesRead);
var chunk = chunkBuilder.apply(bytes);
callObserver.onNext(chunk);
LOG.debug("Sent {} bytes", bytesRead);
}
void tryCompleteRequest() {
void finishProcessing() {
if (Objects.nonNull(downloadError.get())) {
throw downloadError.get();
} else if (downloadFinished.get()) {
completeRequestNormally();
} else {
finishRequest();
callObserver.onCompleted();
}
}
void completeRequestWithError(TechnicalException e) {
void handleError(TechnicalException e) {
LOG.debug("Complete download request with error");
finishRequest();
throw e;
}
void completeRequestNormally() {
LOG.debug("Complete download request");
finishRequest();
callObserver.onCompleted();
}
private void finishRequest() {
requestFinished.set(true);
closeInputStream();
......
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.OutputStream;
import java.util.Arrays;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.SneakyThrows;
class GrpcBinaryFileServerDownloaderITCase {
@Mock
private CallStreamObserver<GrpcResponseDummy> callObserver;
private SimpleAsyncTaskExecutor taskExecutor;
private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader;
@BeforeEach
void init() {
taskExecutor = new SimpleAsyncTaskExecutor();
when(callObserver.isReady()).thenReturn(true);
}
@AfterEach
void cleanup() {
taskExecutor.close();
}
@Nested
class OnNoError {
private static final int DOWNLOAD_DATA_LENGTH = (int) (GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5);
@Captor
private ArgumentCaptor<GrpcResponseDummy> captor;
@BeforeEach
void init() {
downloader = spy(downloaderBuilder()
.downloadConsumer(this::downloadData)
.build());
}
@SneakyThrows
@Test
void shouldCallOnCompleted() {
start();
verify(callObserver).onCompleted();
}
@Test
void shouldReadAllData() {
start();
verify(callObserver, times(2)).onNext(captor.capture());
var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum();
assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH);
}
@Test
void shouldCloseStreams() {
start();
verify(downloader).closeInputStream();
verify(downloader).closeOutputStream();
}
@Test
void shouldCompleteIfDownloadConsumerClosedOutputStream() {
downloader = spy(downloaderBuilder()
.downloadConsumer(this::downloadDataAndCloseStream)
.build());
start();
verify(callObserver).onCompleted();
}
@SneakyThrows
private void downloadData(OutputStream outputStream) {
byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH];
Arrays.fill(bytes, (byte) 1);
outputStream.write(bytes);
}
@SneakyThrows
private void downloadDataAndCloseStream(OutputStream outputStream) {
downloadData(outputStream);
outputStream.close();
Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns
}
}
@Nested
class OnError {
private final Throwable error = new TechnicalException("error");
@BeforeEach
void init() {
downloader = spy(downloaderBuilder()
.downloadConsumer(this::downloadData)
.build());
}
@Test
void shouldThrowException() {
assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class);
}
@Test
void shouldNotCallOnCompleted() {
catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
verify(callObserver, never()).onCompleted();
}
@Test
void shouldCloseStreams() {
catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
verify(downloader).closeInputStream();
verify(downloader).closeOutputStream();
}
@SneakyThrows
private void downloadData(OutputStream outputStream) {
throw error;
}
}
private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() {
return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder()
.taskExecutor(taskExecutor)
.callObserver(callObserver)
.chunkBuilder(this::buildChunk);
}
private GrpcResponseDummy buildChunk(ByteString data) {
return new GrpcResponseDummy(data.size());
}
private void start() {
downloader.start();
downloader.sendChunks();
}
private record GrpcResponseDummy(int bytesRead) {}
}
......@@ -23,6 +23,7 @@
*/
package de.ozgcloud.common.binaryfile;
import static de.ozgcloud.common.binaryfile.GrpcBinaryFileServerDownloader.*;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
......@@ -37,11 +38,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
......@@ -300,18 +304,6 @@ class GrpcBinaryFileServerDownloaderTest {
verify(downloadConsumer).accept(outputStream);
}
@Test
void shouldDownloadFinishedBeInitiallyFalse() {
assertThat(getDownloadFinished()).isFalse();
}
@Test
void shouldSetDownloadFinished() {
downloader.doDownload();
assertThat(getDownloadFinished()).isTrue();
}
}
@Nested
......@@ -346,14 +338,14 @@ class GrpcBinaryFileServerDownloaderTest {
@BeforeEach
void init() {
doThrow(exception).when(downloader).doSendChunks();
doNothing().when(downloader).completeRequestWithError(any());
doNothing().when(downloader).handleError(any());
}
@Test
void shouldCompleteRequestWithError() {
void shouldHandleError() {
downloader.sendChunks();
verify(downloader).completeRequestWithError(argumentCaptor.capture());
verify(downloader).handleError(argumentCaptor.capture());
assertThat(argumentCaptor.getValue()).isInstanceOf(TechnicalException.class).hasCause(exception);
}
}
......@@ -362,103 +354,174 @@ class GrpcBinaryFileServerDownloaderTest {
@Nested
class TestDoSendChunks {
@SneakyThrows
@Test
void shouldCheckIfCanSendChunks() {
doReturn(false).when(downloader).canSendChunks();
downloader.doSendChunks();
verify(downloader).canSendChunks();
}
@SneakyThrows
@Test
void shouldNotProcessIfCannotSendChunks() {
doReturn(false).when(downloader).canSendChunks();
downloader.doSendChunks();
verify(downloader, never()).processDataFromInputStream();
}
@SneakyThrows
@Test
void shouldProcessAsLongAsCanSendChunks() {
doReturn(true, true, false).when(downloader).canSendChunks();
doNothing().when(downloader).processDataFromInputStream();
downloader.doSendChunks();
verify(downloader, times(2)).processDataFromInputStream();
}
}
@Nested
class TestProcessDataFromInputStream {
@Mock
private PipedInputStream inputStream;
@Nested
class OnRequestFinished {
class OnEndOfStreamReached {
@SneakyThrows
@BeforeEach
void init() {
setRequestFinishedField(true);
doNothing().when(downloader).finishProcessing();
when(inputStream.read(any())).thenReturn(END_OF_STREAM);
setInputStreamField(inputStream);
}
@SneakyThrows
@Test
void shouldNotInteractWithCallObserver() {
doSendChunks();
void shouldFinishProcessing() {
downloader.processDataFromInputStream();
verifyNoInteractions(callObserver);
verify(downloader).finishProcessing();
}
@SneakyThrows
@Test
void shouldNotSendBytesToCallObserver() {
downloader.processDataFromInputStream();
verify(downloader, never()).sendBytesToCallObserver(anyInt());
}
}
@Nested
class OnRequestNotFinished {
class OnNoBytesWereReceived {
@Nested
class OnNotReady {
@SneakyThrows
@BeforeEach
void init() {
when(inputStream.read(any())).thenReturn(NOTHING_READ);
setInputStreamField(inputStream);
}
@SneakyThrows
@Test
void shouldNotSendBytesToCallObserver() {
downloader.processDataFromInputStream();
verify(downloader, never()).sendBytesToCallObserver(anyInt());
}
}
@Nested
class OnBytesWereReceived {
private final int bytesRead = 20;
@BeforeEach
void init() {
when(callObserver.isReady()).thenReturn(false);
}
@SneakyThrows
@BeforeEach
void mock() {
when(inputStream.read(any())).thenReturn(bytesRead);
setInputStreamField(inputStream);
}
@Test
void shouldOnlyCallIsReadyOnObserver() {
doSendChunks();
@SneakyThrows
@Test
void shouldSendBytesToCallObserver() {
downloader.processDataFromInputStream();
verify(callObserver).isReady();
verifyNoMoreInteractions(callObserver);
}
verify(downloader).sendBytesToCallObserver(bytesRead);
}
}
}
@Nested
class OnReady {
@Nested
class TestSendBytesToCallObserver {
@Mock
private PipedInputStream inputStream;
@Captor
private ArgumentCaptor<ByteString> byteStringCaptor;
@Captor
private ArgumentCaptor<ByteString> byteStringCaptor;
private final int bytesRead = 20;
private final byte[] buffer = new byte[bytesRead];
private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
private final int readBytes = 20;
private final byte[] buffer = new byte[readBytes];
private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
@BeforeEach
void init() {
new Random().nextBytes(buffer);
ReflectionTestUtils.setField(downloader, "buffer", buffer);
}
@SneakyThrows
@BeforeEach
void mock() {
doNothing().when(downloader).tryCompleteRequest();
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(readBytes, -1);
setInputStreamField(inputStream);
new Random().nextBytes(buffer);
ReflectionTestUtils.setField(downloader, "buffer", buffer);
}
@SneakyThrows
@Test
void shouldCallChunkBuilder() {
downloader.sendBytesToCallObserver(bytesRead);
@Test
void shouldCallChunkBuilder() {
doSendChunks();
verify(chunkBuilder).apply(byteStringCaptor.capture());
assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
}
verify(chunkBuilder).apply(byteStringCaptor.capture());
assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
}
@SneakyThrows
@Test
void shouldCallOnNext() {
when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
@DisplayName("should send next chunk if callObserver is ready and stream already received data")
@Test
void shouldCallOnNext() {
when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
downloader.sendBytesToCallObserver(bytesRead);
doSendChunks();
verify(callObserver).onNext(grpcResponseDummy);
}
}
verify(callObserver).onNext(grpcResponseDummy);
}
@Nested
class TestCanSendChunks {
@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@Test
void shouldTryCompleteRequest() {
setDownloadFinishedField(true);
@ParameterizedTest
@MethodSource("provideArguments")
void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) {
setRequestFinishedField(requestFinished);
lenient().when(callObserver.isReady()).thenReturn(ready);
doSendChunks();
var canSendChunks = downloader.canSendChunks();
verify(downloader).tryCompleteRequest();
}
}
assertThat(canSendChunks).isEqualTo(expected);
}
@SneakyThrows
private void doSendChunks() {
downloader.doSendChunks();
private static Stream<Arguments> provideArguments() {
return Stream.of(
Arguments.of(false, false, false),
Arguments.of(false, true, true),
Arguments.of(true, false, false),
Arguments.of(true, true, false)
);
}
}
@Nested
class TestTryCompleteRequest {
class TestFinishProcessing {
@Nested
class OnError {
......@@ -472,92 +535,52 @@ class GrpcBinaryFileServerDownloaderTest {
@Test
void shouldThrowException() {
assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception);
assertThatThrownBy(downloader::finishProcessing).isSameAs(exception);
}
}
@Nested
class OnDownloadFinished {
class OnNoError {
@BeforeEach
void init() {
setDownloadFinishedField(true);
doNothing().when(downloader).completeRequestNormally();
doNothing().when(downloader).closeInputStream();
}
@Test
void shouldNotCompleteRequestWithError() {
downloader.tryCompleteRequest();
downloader.finishProcessing();
verify(downloader, never()).completeRequestWithError(any());
verify(downloader, never()).handleError(any());
}
@Test
void shouldCompleteRequestNormally() {
downloader.tryCompleteRequest();
void shouldSetRequestFinished() {
assertThat(getRequestFinished()).isFalse();
verify(downloader).completeRequestNormally();
}
}
downloader.finishProcessing();
@Nested
class OnDownloadNotFinished {
@BeforeEach
void init() {
setDownloadFinishedField(false);
assertThat(getRequestFinished()).isTrue();
}
@Test
void shouldNotCompleteRequestNormally() {
downloader.tryCompleteRequest();
void shouldCloseInputStream() {
downloader.finishProcessing();
verify(downloader, never()).completeRequestNormally();
verify(downloader).closeInputStream();
}
@Test
void shouldNotCompleteRequestWithError() {
downloader.tryCompleteRequest();
void shouldNotifyObserver() {
downloader.finishProcessing();
verify(downloader, never()).completeRequestWithError(any());
verify(callObserver).onCompleted();
}
}
}
@Nested
class TestCompleteRequestNormally {
@BeforeEach
void init() {
doNothing().when(downloader).closeInputStream();
}
@Test
void shouldSetRequestFinished() {
assertThat(getRequestFinished()).isFalse();
downloader.completeRequestNormally();
assertThat(getRequestFinished()).isTrue();
}
@Test
void shouldCloseInputStream() {
downloader.completeRequestNormally();
verify(downloader).closeInputStream();
}
@Test
void shouldNotifyObserver() {
downloader.completeRequestNormally();
verify(callObserver).onCompleted();
}
}
@Nested
class TestCompleteRequestWithError {
class TestHandleError {
private final TechnicalException error = new TechnicalException("error");
......@@ -570,21 +593,21 @@ class GrpcBinaryFileServerDownloaderTest {
void shouldSetRequestFinished() {
assertThat(getRequestFinished()).isFalse();
catchException(() -> downloader.completeRequestWithError(error));
catchException(() -> downloader.handleError(error));
assertThat(getRequestFinished()).isTrue();
}
@Test
void shouldCloseInputStream() {
catchException(() -> downloader.completeRequestWithError(error));
catchException(() -> downloader.handleError(error));
verify(downloader).closeInputStream();
}
@Test
void shouldThrowException() {
assertThatThrownBy(() -> downloader.completeRequestWithError(error)).isSameAs(error);
assertThatThrownBy(() -> downloader.handleError(error)).isSameAs(error);
}
}
......@@ -612,14 +635,6 @@ class GrpcBinaryFileServerDownloaderTest {
return (TechnicalException) ReflectionTestUtils.getField(downloader, "downloadError", AtomicReference.class).get();
}
private void setDownloadFinishedField(boolean downloadFinished) {
ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
}
private boolean getDownloadFinished() {
return ReflectionTestUtils.getField(downloader, "downloadFinished", AtomicBoolean.class).get();
}
private static class GrpcResponseDummy {
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment