Skip to content
Snippets Groups Projects
Commit da68d4a4 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7262 OZG-7627 Remove CallStreamObserverWrapper

parent d1187a9e
No related branches found
No related tags found
1 merge request!6OZG-7262 OZG-7566 Notify callObserver about error
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.util.function.Supplier;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
class CallStreamObserverWrapper<V> {
private final CallStreamObserver<V> callStreamObserver;
private boolean failed;
public void setOnReadyHandler(Runnable onReadyHandler) {
callStreamObserver.setOnReadyHandler(onReadyHandler);
}
public synchronized boolean isReady() {
return ifNotFailed(callStreamObserver::isReady);
}
public synchronized void onNext(V value) {
ifNotFailed(() -> callStreamObserver.onNext(value));
}
public synchronized void onError(Throwable t) {
if (!failed) {
callStreamObserver.onError(new BinaryFileDownloadException(t));
failed = true;
} else {
handleIllegalCallAfterError();
}
}
public synchronized void onCompleted() {
ifNotFailed(callStreamObserver::onCompleted);
}
private void ifNotFailed(Runnable runnable) {
if (!failed) {
runnable.run();
} else {
handleIllegalCallAfterError();
}
}
private <T> T ifNotFailed(Supplier<T> supplier) {
return !failed ? supplier.get() : handleIllegalCallAfterError();
}
private <T> T handleIllegalCallAfterError() {
throw new TechnicalException("CallStreamObserver called after error");
}
}
......@@ -28,6 +28,7 @@ import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -45,15 +46,16 @@ public class GrpcBinaryFileServerDownloader<T> {
private static final int CHUNK_SIZE = 255 * 1024;
private final CallStreamObserverWrapper<T> callObserver;
private final CallStreamObserver<T> callObserver;
private final Function<ByteString, T> chunkBuilder;
private final Consumer<OutputStream> downloadConsumer;
private final TaskExecutor taskExecutor;
private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
private final byte[] buffer = new byte[CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
private final AtomicBoolean requestFinished = new AtomicBoolean(false);
private final AtomicReference<BinaryFileDownloadException> error = new AtomicReference<>();
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
......@@ -61,7 +63,7 @@ public class GrpcBinaryFileServerDownloader<T> {
@Builder
public GrpcBinaryFileServerDownloader(CallStreamObserver<T> callObserver, Function<ByteString, T> chunkBuilder,
Consumer<OutputStream> downloadConsumer, TaskExecutor taskExecutor) {
this.callObserver = new CallStreamObserverWrapper<>(callObserver);
this.callObserver = callObserver;
this.chunkBuilder = chunkBuilder;
this.downloadConsumer = downloadConsumer;
this.taskExecutor = taskExecutor;
......@@ -88,19 +90,27 @@ public class GrpcBinaryFileServerDownloader<T> {
try {
setupStreams();
} catch (Exception e) {
closeStreams();
closeOutputStream();
closeInputStream();
throw new BinaryFileDownloadException(e);
}
}
void setupStreams() throws IOException {
outputStream = new PipedOutputStream();
inputStream = new PipedInputStream(GrpcBinaryFileServerDownloader.CHUNK_SIZE);
inputStream = new PipedInputStream(CHUNK_SIZE);
outputStream.connect(inputStream);
}
void startDownload() {
withDownloadErrorHandling(this::doDownload);
try {
doDownload();
} catch (Exception e) {
error.set(new BinaryFileDownloadException(e));
} finally {
closeOutputStream();
sendChunks();
}
}
void doDownload() {
......@@ -108,12 +118,14 @@ public class GrpcBinaryFileServerDownloader<T> {
downloadConsumer.accept(outputStream);
LOG.debug("Download completed.");
downloadFinished.set(true);
closeOutputStream();
sendChunks();
}
synchronized void sendChunks() {
withDownloadErrorHandling(this::doSendChunks);
try {
doSendChunks();
} catch (Exception e) {
completeRequestWithError(e);
}
}
void doSendChunks() throws IOException {
......@@ -121,7 +133,11 @@ public class GrpcBinaryFileServerDownloader<T> {
return;
}
int bytesRead;
while (callObserver.isReady()) {
while (isReady()) {
if (error.get() != null) {
completeRequestWithError(error.get());
break;
}
if ((bytesRead = inputStream.read(buffer)) == -1) {
tryCompleteRequest();
break;
......@@ -131,6 +147,10 @@ public class GrpcBinaryFileServerDownloader<T> {
}
}
private boolean isReady() {
return callObserver.isReady();
}
void tryCompleteRequest() {
if (shouldCompleteRequest()) {
completeRequest();
......@@ -147,18 +167,11 @@ public class GrpcBinaryFileServerDownloader<T> {
callObserver.onCompleted();
}
void withDownloadErrorHandling(ExceptionalRunnable runnable) {
try {
runnable.run();
} catch (Exception e) {
closeStreams();
callObserver.onError(e);
}
}
private void closeStreams() {
closeOutputStream();
void completeRequestWithError(Throwable t) {
LOG.debug("Complete download request with error", t);
requestFinished.set(true);
closeInputStream();
callObserver.onError(t);
}
void closeOutputStream() {
......
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.common.test.ReflectionTestUtils;
import io.grpc.stub.CallStreamObserver;
class CallStreamObserverWrapperTest {
@Mock
private CallStreamObserver<GrpcResponseDummy> callObserver;
@InjectMocks
private CallStreamObserverWrapper<GrpcResponseDummy> wrapper;
@Nested
class TestSetOnReadyHandler {
@Test
void shouldForwardToObserver() {
var onReadyHandler = mock(Runnable.class);
wrapper.setOnReadyHandler(onReadyHandler);
verify(callObserver).setOnReadyHandler(onReadyHandler);
}
}
@Nested
class TestIsReady {
@Nested
class OnFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", true);
}
@Test
void shouldNotCallObserver() {
catchThrowable(wrapper::isReady);
verifyNoInteractions(callObserver);
}
@Test
void shouldThrowException() {
assertThatThrownBy(wrapper::isReady).isInstanceOf(TechnicalException.class);
}
}
@Nested
class OnNotFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", false);
}
@Test
void shouldForwardToObserver() {
wrapper.isReady();
verify(callObserver).isReady();
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
void shouldReturnObserverResult(boolean observerResult) {
when(callObserver.isReady()).thenReturn(observerResult);
var ready = wrapper.isReady();
assertThat(ready).isEqualTo(observerResult);
}
}
}
@Nested
class TestOnNext {
private final GrpcResponseDummy grpccResponse = new GrpcResponseDummy();
@Nested
class OnFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", true);
}
@Test
void shouldNotCallObserver() {
catchThrowable(() -> wrapper.onNext(grpccResponse));
verifyNoInteractions(callObserver);
}
@Test
void shouldThrowException() {
assertThatThrownBy(() -> wrapper.onNext(grpccResponse)).isInstanceOf(TechnicalException.class);
}
}
@Nested
class OnNotFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", false);
}
@Test
void shouldForwardToObserver() {
wrapper.onNext(grpccResponse);
verify(callObserver).onNext(grpccResponse);
}
}
}
@Nested
class TestOnError {
private final TechnicalException exception = new TechnicalException("dummy");
@Nested
class OnFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", true);
}
@Test
void shouldNotCallObserver() {
catchThrowable(() -> wrapper.onError(exception));
verifyNoInteractions(callObserver);
}
@Test
void shouldThrowException() {
assertThatThrownBy(() -> wrapper.onError(exception)).isInstanceOf(TechnicalException.class);
}
}
@Nested
class OnNotFailed {
private ArgumentCaptor<BinaryFileDownloadException> captor = ArgumentCaptor.forClass(BinaryFileDownloadException.class);
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", false);
}
@Test
void shouldForwardBinaryFileDownloadExceptionToObserver() {
wrapper.onError(exception);
verify(callObserver).onError(captor.capture());
assertThat(captor.getValue()).isInstanceOf(BinaryFileDownloadException.class);
assertThat(captor.getValue().getCause()).isEqualTo(exception);
}
@Test
void shouldSetFailedToTrue() {
wrapper.onError(exception);
assertThat(wrapper).extracting("failed").isEqualTo(true);
}
}
}
@Nested
class TestOnCompleted {
@Nested
class OnFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", true);
}
@Test
void shouldNotCallObserver() {
catchThrowable(wrapper::onCompleted);
verifyNoInteractions(callObserver);
}
@Test
void shouldThrowException() {
assertThatThrownBy(wrapper::onCompleted).isInstanceOf(TechnicalException.class);
}
}
@Nested
class OnNotFailed {
@BeforeEach
void init() {
ReflectionTestUtils.setField(wrapper, "failed", false);
}
@Test
void shouldForwardToObserver() {
wrapper.onCompleted();
verify(callObserver).onCompleted();
}
}
}
private static class GrpcResponseDummy {
}
}
......@@ -34,6 +34,7 @@ import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -50,12 +51,13 @@ import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.common.test.ReflectionTestUtils;
import io.grpc.stub.CallStreamObserver;
import lombok.SneakyThrows;
class GrpcBinaryFileServerDownloaderTest {
@Mock
private CallStreamObserverWrapper<GrpcResponseDummy> callObserverWrapper;
private CallStreamObserver<GrpcResponseDummy> callObserver;
@Mock
private Function<ByteString, GrpcResponseDummy> chunkBuilder;
@Mock
......@@ -67,9 +69,8 @@ class GrpcBinaryFileServerDownloaderTest {
@BeforeEach
void init() {
downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().downloadConsumer(downloadConsumer)
downloader = spy(GrpcBinaryFileServerDownloader.<GrpcResponseDummy>builder().callObserver(callObserver).downloadConsumer(downloadConsumer)
.chunkBuilder(chunkBuilder).taskExecutor(taskExecutor).build());
ReflectionTestUtils.setField(downloader, "callObserver", callObserverWrapper);
}
@Nested
......@@ -152,7 +153,7 @@ class GrpcBinaryFileServerDownloaderTest {
void shouldSetOnReadyHandler() {
downloader.doStart();
verify(callObserverWrapper).setOnReadyHandler(runnableCaptor.capture());
verify(callObserver).setOnReadyHandler(runnableCaptor.capture());
assertThat(runnableCaptor.getValue()).isNotNull();
}
}
......@@ -218,22 +219,86 @@ class GrpcBinaryFileServerDownloaderTest {
@Nested
class TestStartDownload {
@Captor
private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
@Mock
private PipedOutputStream outputStream;
@BeforeEach
void init() {
setOutputStreamField(outputStream);
}
@SneakyThrows
@Test
void shouldCallDoDownload() {
doNothing().when(downloader).withDownloadErrorHandling(any());
void shouldErrorBeInitiallyNull() {
assertThat(getError()).isNull();
}
@Nested
class OnNoException {
@BeforeEach
void init() {
doNothing().when(downloader).doDownload();
}
@Test
void shouldDoDownload() {
downloader.startDownload();
verify(downloader).withDownloadErrorHandling(runnableCaptor.capture());
runnableCaptor.getValue().run();
verify(downloader).doDownload();
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
downloader.startDownload();
verify(outputStream).close();
}
@Test
@DisplayName("should send chunks here to not wait for callObserver to change its ready status")
void shouldSendChunks() {
downloader.startDownload();
verify(downloader).sendChunks();
}
}
@Nested
class OnException {
private final TechnicalException exception = new TechnicalException("error");
@BeforeEach
void init() {
doThrow(exception).when(downloader).doDownload();
}
@Test
void shouldSetError() {
downloader.startDownload();
assertThat(getError()).isInstanceOf(BinaryFileDownloadException.class).hasCause(exception);
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
downloader.startDownload();
verify(outputStream).close();
}
@Test
@DisplayName("should send chunks here to communicate error to callObserver")
void shouldSendChunks() {
downloader.startDownload();
verify(downloader).sendChunks();
}
}
}
@Nested
class TestDoDownload {
......@@ -245,7 +310,6 @@ class GrpcBinaryFileServerDownloaderTest {
setOutputStreamField(outputStream);
}
@SneakyThrows
@Test
void shouldCallDownloadConsumer() {
downloader.doDownload();
......@@ -253,38 +317,136 @@ class GrpcBinaryFileServerDownloaderTest {
verify(downloadConsumer).accept(outputStream);
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
void shouldDownloadFinishedBeInitiallyFalse() {
assertThat(getDownloadFinished()).isFalse();
}
@Test
void shouldSetDownloadFinished() {
downloader.doDownload();
verify(outputStream).close();
}
assertThat(getDownloadFinished()).isTrue();
}
}
@Nested
class TestSendChunks {
@Captor
private ArgumentCaptor<ExceptionalRunnable> runnableCaptor;
@Nested
class OnNoException {
@SneakyThrows
@Test
void shouldCallWithDownloadErrorHandling() {
doNothing().when(downloader).withDownloadErrorHandling(any());
@BeforeEach
void init() {
doNothing().when(downloader).doSendChunks();
}
@SneakyThrows
@Test
void shouldDoSendChunks() {
downloader.sendChunks();
verify(downloader).withDownloadErrorHandling(runnableCaptor.capture());
runnableCaptor.getValue().run();
verify(downloader).doSendChunks();
}
}
@Nested
class OnException {
private final TechnicalException exception = new TechnicalException("error");
@SneakyThrows
@BeforeEach
void init() {
doThrow(exception).when(downloader).doSendChunks();
}
@Test
void shouldCompleteRequestWithError() {
downloader.sendChunks();
verify(downloader).completeRequestWithError(exception);
}
}
}
@Nested
class TestDoSendChunks {
@Nested
class OnRequestFinished {
@BeforeEach
void init() {
setRequestFinishedField(true);
}
@Test
void shouldNotInteractWithCallObserver() {
doSendChunks();
verifyNoInteractions(callObserver);
}
}
@Nested
class OnRequestNotFinished {
@Nested
class OnNotReady {
@BeforeEach
void init() {
when(callObserver.isReady()).thenReturn(false);
}
@Test
void shouldOnlyCallIsReadyOnObserver() {
doSendChunks();
verify(callObserver).isReady();
verifyNoMoreInteractions(callObserver);
}
}
@Nested
class OnReady {
@BeforeEach
void init() {
when(callObserver.isReady()).thenReturn(true);
}
@Nested
class OnHasError {
private final BinaryFileDownloadException exception = new BinaryFileDownloadException(new TechnicalException("error"));
@BeforeEach
void init() {
setErrorField(exception);
doNothing().when(downloader).completeRequestWithError(any());
}
@Test
void shouldOnlyCallIsReadyOnObserver() {
doSendChunks();
verify(callObserver).isReady();
verifyNoMoreInteractions(callObserver);
}
@Test
void shouldCompleteRequestWithError() {
doSendChunks();
verify(downloader).completeRequestWithError(exception);
}
}
@Nested
class OnHasNoError {
@Mock
private PipedInputStream inputStream;
@Captor
......@@ -298,7 +460,7 @@ class GrpcBinaryFileServerDownloaderTest {
@BeforeEach
void mock() {
doNothing().when(downloader).tryCompleteRequest();
when(callObserverWrapper.isReady()).thenReturn(true);
when(callObserver.isReady()).thenReturn(true);
when(inputStream.read(any())).thenReturn(readBytes, -1);
setInputStreamField(inputStream);
new Random().nextBytes(buffer);
......@@ -320,25 +482,27 @@ class GrpcBinaryFileServerDownloaderTest {
doSendChunks();
verify(callObserverWrapper).onNext(grpcResponseDummy);
verify(callObserver).onNext(grpcResponseDummy);
}
@DisplayName("should call complete grpc stream if download has finished and stream has no data left")
@Test
void shouldCallCompleteDownload() {
void shouldTryCompleteRequest() {
setDownloadFinishedField(true);
doSendChunks();
verify(downloader).tryCompleteRequest();
}
}
}
}
@SneakyThrows
private void doSendChunks() {
downloader.doSendChunks();
}
}
}
@Nested
class TestTryCompleteRequest {
......@@ -453,69 +617,41 @@ class GrpcBinaryFileServerDownloaderTest {
void shouldCallOnCompleted() {
downloader.completeRequest();
verify(callObserverWrapper).onCompleted();
}
verify(callObserver).onCompleted();
}
@Nested
class TestWithDownloadErrorHandling {
@SneakyThrows
@Test
void shouldRunRunnable() {
var runnable = mock(ExceptionalRunnable.class);
downloader.withDownloadErrorHandling(runnable);
verify(runnable).run();
}
@Nested
class OnException {
@Mock
private PipedOutputStream outputStream;
@Mock
private PipedInputStream inputStream;
class TestCompleteRequestWithError {
private final TechnicalException exception = new TechnicalException("dummy");
private final Throwable error = new Throwable();
@BeforeEach
void init() {
setInputStreamField(inputStream);
setOutputStreamField(outputStream);
doNothing().when(downloader).closeInputStream();
}
@SneakyThrows
@Test
void shouldCloseOutputStream() {
withDownloadErrorHandling();
void shouldSetRequestFinished() {
assertThat(getRequestFinished()).isFalse();
verify(outputStream).close();
downloader.completeRequestWithError(error);
assertThat(getRequestFinished()).isTrue();
}
@SneakyThrows
@Test
void shouldCloseInputStream() {
withDownloadErrorHandling();
downloader.completeRequestWithError(error);
verify(inputStream).close();
verify(downloader).closeInputStream();
}
@Test
void shouldNotifyCallObserver() {
withDownloadErrorHandling();
verify(callObserverWrapper).onError(argThat(TechnicalException.class::isInstance));
}
private void withDownloadErrorHandling() {
downloader.withDownloadErrorHandling(this::dummyMethodThrowingException);
}
downloader.completeRequestWithError(error);
private void dummyMethodThrowingException() throws TechnicalException {
throw exception;
}
verify(callObserver).onError(error);
}
}
......@@ -527,10 +663,6 @@ class GrpcBinaryFileServerDownloaderTest {
ReflectionTestUtils.setField(downloader, "inputStream", inputStream);
}
private void setDownloadFinishedField(boolean downloadFinished) {
ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
}
private void setRequestFinishedField(boolean requestFinished) {
ReflectionTestUtils.setField(downloader, "requestFinished", new AtomicBoolean(requestFinished));
}
......@@ -539,6 +671,22 @@ class GrpcBinaryFileServerDownloaderTest {
return ReflectionTestUtils.getField(downloader, "requestFinished", AtomicBoolean.class).get();
}
private void setErrorField(BinaryFileDownloadException error) {
ReflectionTestUtils.setField(downloader, "error", new AtomicReference<>(error));
}
private BinaryFileDownloadException getError() {
return (BinaryFileDownloadException) ReflectionTestUtils.getField(downloader, "error", AtomicReference.class).get();
}
private void setDownloadFinishedField(boolean downloadFinished) {
ReflectionTestUtils.setField(downloader, "downloadFinished", new AtomicBoolean(downloadFinished));
}
private boolean getDownloadFinished() {
return ReflectionTestUtils.getField(downloader, "downloadFinished", AtomicBoolean.class).get();
}
private static class GrpcResponseDummy {
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment