Skip to content
Snippets Groups Projects
Commit 133aa19b authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7262 OZG-7680 Fix onReadyHandler not finishing

parent aa16948f
Branches
Tags
1 merge request!8Ozg 7262 fix unfinished downloads
......@@ -46,7 +46,10 @@ import lombok.extern.log4j.Log4j2;
@Log4j2
public class GrpcBinaryFileServerDownloader<T> {
private static final int CHUNK_SIZE = 255 * 1024;
static final int CHUNK_SIZE = 255 * 1024;
private static final int END_OF_STREAM = -1;
private static final int NOTHING_READ = 0;
private final CallStreamObserver<T> callObserver;
private final Function<ByteString, T> chunkBuilder;
......@@ -55,7 +58,6 @@ public class GrpcBinaryFileServerDownloader<T> {
private final byte[] buffer = new byte[CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
private final AtomicBoolean requestFinished = new AtomicBoolean(false);
private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>();
......@@ -118,7 +120,6 @@ public class GrpcBinaryFileServerDownloader<T> {
LOG.debug("Downloading file content...");
downloadConsumer.accept(outputStream);
LOG.debug("Download completed.");
downloadFinished.set(true);
}
synchronized void sendChunks() {
......@@ -130,28 +131,33 @@ public class GrpcBinaryFileServerDownloader<T> {
}
void doSendChunks() throws IOException {
if (requestFinished.get()) {
return;
}
int bytesRead;
while (isReady()) {
if ((bytesRead = inputStream.read(buffer)) == -1) {
tryCompleteRequest();
break;
}
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
LOG.debug("Sent {} bytes", bytesRead);
while (canSendChunks()) {
processDataFromInputStream();
}
}
private boolean isReady() {
return callObserver.isReady();
boolean canSendChunks() {
return !requestFinished.get() && callObserver.isReady();
}
void processDataFromInputStream() throws IOException {
var bytesRead = inputStream.read(buffer);
switch (bytesRead) {
case END_OF_STREAM:
completeRequest();
break;
case NOTHING_READ:
break;
default:
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
LOG.debug("Sent {} bytes", bytesRead);
}
}
void tryCompleteRequest() {
void completeRequest() {
if (Objects.nonNull(downloadError.get())) {
throw downloadError.get();
} else if (downloadFinished.get()) {
} else {
completeRequestNormally();
}
}
......
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.OutputStream;
import java.util.Arrays;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.SneakyThrows;
class GrpcBinaryFileServerDownloaderITCase {
@Mock
private CallStreamObserver<GrpcResponseDummy> callObserver;
private SimpleAsyncTaskExecutor taskExecutor;
private GrpcBinaryFileServerDownloader<GrpcResponseDummy> downloader;
@BeforeEach
void init() {
taskExecutor = new SimpleAsyncTaskExecutor();
when(callObserver.isReady()).thenReturn(true);
}
@AfterEach
void cleanup() {
taskExecutor.close();
}
@Nested
class OnNoError {
private static final int DOWNLOAD_DATA_LENGTH = Double.valueOf(GrpcBinaryFileServerDownloader.CHUNK_SIZE * 1.5).intValue();
@Captor
private ArgumentCaptor<GrpcResponseDummy> captor;
@BeforeEach
void init() {
downloader = spy(downloaderBuilder()
.downloadConsumer(this::downloadData)
.build());
}
@SneakyThrows
@Test
void shouldCallOnCompleted() {
start();
verify(callObserver).onCompleted();
}
@Test
void shouldReadAllData() {
start();
verify(callObserver, times(2)).onNext(captor.capture());
var totalBytesRead = captor.getAllValues().stream().mapToInt(GrpcResponseDummy::bytesRead).sum();
assertThat(totalBytesRead).isEqualTo(DOWNLOAD_DATA_LENGTH);
}
@Test
void shouldCloseStreams() {
start();
verify(downloader).closeInputStream();
verify(downloader).closeOutputStream();
}
@Test
void shouldCompleteIfDownloadConsumerClosedOutputStream() {
downloader = spy(downloaderBuilder()
.downloadConsumer(this::downloadDataAndCloseStream)
.build());
start();
verify(callObserver).onCompleted();
}
@SneakyThrows
private void downloadData(OutputStream outputStream) {
byte[] bytes = new byte[DOWNLOAD_DATA_LENGTH];
Arrays.fill(bytes, (byte) 1);
outputStream.write(bytes);
}
@SneakyThrows
private void downloadDataAndCloseStream(OutputStream outputStream) {
downloadData(outputStream);
outputStream.close();
Thread.sleep(100); // delay, so that the onReadyHandler gets end of stream before this method returns
}
}
@Nested
class OnError {
private final Throwable error = new TechnicalException("error");
@BeforeEach
void init() {
downloader = spy(downloaderBuilder()
.downloadConsumer(this::downloadData)
.build());
}
@Test
void shouldThrowException() {
assertThatThrownBy(GrpcBinaryFileServerDownloaderITCase.this::start).isInstanceOf(TechnicalException.class);
}
@Test
void shouldNotCallOnCompleted() {
catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
verify(callObserver, never()).onCompleted();
}
@Test
void shouldCloseStreams() {
catchException(GrpcBinaryFileServerDownloaderITCase.this::start);
verify(downloader).closeInputStream();
verify(downloader).closeOutputStream();
}
@SneakyThrows
private void downloadData(OutputStream outputStream) {
throw error;
}
}
private GrpcBinaryFileServerDownloader.GrpcBinaryFileServerDownloaderBuilder<GrpcResponseDummy> downloaderBuilder() {
return GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder()
.taskExecutor(taskExecutor)
.callObserver(callObserver)
.chunkBuilder(this::buildChunk);
}
private GrpcResponseDummy buildChunk(ByteString data) {
return new GrpcResponseDummy(data.size());
}
private void start() {
downloader.start();
downloader.sendChunks();
}
private record GrpcResponseDummy(int bytesRead) {}
}
......@@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
......@@ -37,11 +38,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
......@@ -300,18 +305,6 @@ class GrpcBinaryFileServerDownloaderTest {
verify(downloadConsumer).accept(outputStream);
}
@Test
void shouldDownloadFinishedBeInitiallyFalse() {
assertThat(getDownloadFinished()).isFalse();
}
@Test
void shouldSetDownloadFinished() {
downloader.doDownload();
assertThat(getDownloadFinished()).isTrue();
}
}
@Nested
......@@ -362,103 +355,157 @@ class GrpcBinaryFileServerDownloaderTest {
@Nested
class TestDoSendChunks {
@SneakyThrows
@Test
void shouldCheckIfCanSendChunks() {
doReturn(false).when(downloader).canSendChunks();
downloader.doSendChunks();
verify(downloader).canSendChunks();
}
@SneakyThrows
@Test
void shouldNotProcessIfCannotSendChunks() {
doReturn(false).when(downloader).canSendChunks();
downloader.doSendChunks();
verify(downloader, never()).processDataFromInputStream();
}
@SneakyThrows
@Test
void shouldProcessAsLongAsCanSendChunks() {
doReturn(true, true, false).when(downloader).canSendChunks();
doNothing().when(downloader).processDataFromInputStream();
downloader.doSendChunks();
verify(downloader, times(2)).processDataFromInputStream();
}
}
@Nested
class TestProcessDataFromInputStream {
@Mock
private PipedInputStream inputStream;
@Nested
class OnRequestFinished {
class OnEndOfStreamReached {
@SneakyThrows
@BeforeEach
void init() {
setRequestFinishedField(true);
doNothing().when(downloader).completeRequest();
when(inputStream.read(any())).thenReturn(-1);
setInputStreamField(inputStream);
}
@SneakyThrows
@Test
void shouldNotInteractWithCallObserver() {
doSendChunks();
void shouldCompleteRequest() {
downloader.processDataFromInputStream();
verify(downloader).completeRequest();
}
@SneakyThrows
@Test
void shouldNotCallCallObserver() {
downloader.processDataFromInputStream();
verifyNoInteractions(callObserver);
}
}
@Nested
class OnRequestNotFinished {
class OnNoBytesWereReceived {
@Nested
class OnNotReady {
@BeforeEach
void init() {
when(callObserver.isReady()).thenReturn(false);
}
@SneakyThrows
@BeforeEach
void init() {
when(inputStream.read(any())).thenReturn(0);
setInputStreamField(inputStream);
}
@Test
void shouldOnlyCallIsReadyOnObserver() {
doSendChunks();
@SneakyThrows
@Test
void shouldNotCallCallObserver() {
downloader.processDataFromInputStream();
verify(callObserver).isReady();
verifyNoMoreInteractions(callObserver);
}
verifyNoInteractions(callObserver);
}
}
@Nested
class OnReady {
@Nested
class OnBytesWereReceived {
@Mock
private PipedInputStream inputStream;
@Captor
private ArgumentCaptor<ByteString> byteStringCaptor;
@Captor
private ArgumentCaptor<ByteString> byteStringCaptor;
private final int readBytes = 20;
private final byte[] buffer = new byte[readBytes];
private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
private final int readBytes = 20;
private final byte[] buffer = new byte[readBytes];
private final GrpcResponseDummy grpcResponseDummy = new GrpcResponseDummy();
@SneakyThrows
@BeforeEach
void mock() {
doNothing().when(downloader).tryCompleteRequest();
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(readBytes, -1);
setInputStreamField(inputStream);
new Random().nextBytes(buffer);
ReflectionTestUtils.setField(downloader, "buffer", buffer);
}
@SneakyThrows
@BeforeEach
void mock() {
when(inputStream.read(any())).thenReturn(readBytes);
setInputStreamField(inputStream);
new Random().nextBytes(buffer);
ReflectionTestUtils.setField(downloader, "buffer", buffer);
}
@Test
void shouldCallChunkBuilder() {
doSendChunks();
@SneakyThrows
@Test
void shouldCallChunkBuilder() {
downloader.processDataFromInputStream();
verify(chunkBuilder).apply(byteStringCaptor.capture());
assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
}
verify(chunkBuilder).apply(byteStringCaptor.capture());
assertThat(byteStringCaptor.getValue().toByteArray()).isEqualTo(buffer);
}
@DisplayName("should send next chunk if callObserver is ready and stream already received data")
@Test
void shouldCallOnNext() {
when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
@SneakyThrows
@Test
void shouldCallOnNext() {
when(chunkBuilder.apply(any())).thenReturn(grpcResponseDummy);
doSendChunks();
downloader.processDataFromInputStream();
verify(callObserver).onNext(grpcResponseDummy);
}
verify(callObserver).onNext(grpcResponseDummy);
}
}
}
@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@Test
void shouldTryCompleteRequest() {
setDownloadFinishedField(true);
@Nested
class TestCanSendChunks {
doSendChunks();
@ParameterizedTest
@MethodSource("provideArguments")
void shouldReturnValue(boolean requestFinished, boolean ready, boolean expected) {
ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
lenient().when(callObserver.isReady()).thenReturn(ready);
verify(downloader).tryCompleteRequest();
}
}
var canSendChunks = downloader.canSendChunks();
assertThat(canSendChunks).isEqualTo(expected);
}
@SneakyThrows
private void doSendChunks() {
downloader.doSendChunks();
private static Stream<Arguments> provideArguments() {
return Stream.of(
Arguments.of(false, false, false),
Arguments.of(false, true, true),
Arguments.of(true, false, false),
Arguments.of(true, true, false)
);
}
}
@Nested
class TestTryCompleteRequest {
class TestCompleteRequest {
@Nested
class OnError {
......@@ -472,56 +519,32 @@ class GrpcBinaryFileServerDownloaderTest {
@Test
void shouldThrowException() {
assertThatThrownBy(downloader::tryCompleteRequest).isSameAs(exception);
assertThatThrownBy(downloader::completeRequest).isSameAs(exception);
}
}
@Nested
class OnDownloadFinished {
class OnNoError {
@BeforeEach
void init() {
setDownloadFinishedField(true);
doNothing().when(downloader).completeRequestNormally();
}
@Test
void shouldNotCompleteRequestWithError() {
downloader.tryCompleteRequest();
downloader.completeRequest();
verify(downloader, never()).completeRequestWithError(any());
}
@Test
void shouldCompleteRequestNormally() {
downloader.tryCompleteRequest();
downloader.completeRequest();
verify(downloader).completeRequestNormally();
}
}
@Nested
class OnDownloadNotFinished {
@BeforeEach
void init() {
setDownloadFinishedField(false);
}
@Test
void shouldNotCompleteRequestNormally() {
downloader.tryCompleteRequest();
verify(downloader, never()).completeRequestNormally();
}
@Test
void shouldNotCompleteRequestWithError() {
downloader.tryCompleteRequest();
verify(downloader, never()).completeRequestWithError(any());
}
}
}
@Nested
......@@ -622,4 +645,62 @@ class GrpcBinaryFileServerDownloaderTest {
private static class GrpcResponseDummy {
}
@Nested
class TestStreams {
private static final int CHUNK_SIZE = 255 * 1024;
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
@SneakyThrows
@BeforeEach
void init() {
outputStream = new PipedOutputStream();
inputStream = new PipedInputStream(CHUNK_SIZE);
outputStream.connect(inputStream);
}
@SneakyThrows
@Test
void shouldReadIncompleteFile() {
var fileBuffer = new byte[CHUNK_SIZE];
var readBuffer = new byte[CHUNK_SIZE];
try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
fileInputStream.read(fileBuffer, 0, 255);
outputStream.write(fileBuffer, 0, 1);
var read = inputStream.read(readBuffer, 0, CHUNK_SIZE);
assertThat(read).isEqualTo(1);
}
}
@SneakyThrows
@Test
void shouldReadAfterOutputStreamWasClosed() {
var fileBuffer = new byte[CHUNK_SIZE];
var readBuffer = new byte[CHUNK_SIZE * 2];
try (FileInputStream fileInputStream = new FileInputStream("/Users/kwitukiewicz/Documents/books/__Debt__The_First_5_000_Years.pdf")) {
fileInputStream.read(fileBuffer, 0, fileBuffer.length);
outputStream.write(fileBuffer);
outputStream.close();
var read = inputStream.read(readBuffer);
assertThat(read).isEqualTo(CHUNK_SIZE);
read = inputStream.read(readBuffer);
assertThat(read).isEqualTo(-1);
}
}
@SneakyThrows
@AfterEach
void cleanup() {
outputStream.close();
inputStream.close();
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment