Skip to content
Snippets Groups Projects
Commit 7c8cf453 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

Merge branch 'OZG-7573-files-weiterleitung-bug' into 'main'

Ozg 7573 files weiterleitung bug

See merge request !12
parents 0702c099 bea6456c
No related branches found
No related tags found
1 merge request!12Ozg 7573 files weiterleitung bug
Showing
with 1071 additions and 430 deletions
...@@ -23,225 +23,69 @@ ...@@ -23,225 +23,69 @@
*/ */
package de.ozgcloud.common.binaryfile; package de.ozgcloud.common.binaryfile;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver; import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class GrpcFileUploadUtils { public class GrpcFileUploadUtils {
static final int CHUNK_SIZE = 4 * 1024; /**
* @param <Q> Request Type
/* * @param <S> Response Type
* Q = Request Type; S = Response Type * @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead
*/ */
@Deprecated(since = "4.13.0")
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true); return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
} }
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, /**
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) { * @param <Q> Request Type
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent); * @param <S> Response Type
*/
public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
} }
public static class FileSender<Q, S> { /**
private final BiFunction<byte[], Integer, Q> chunkBuilder; * @param <Q> Request Type
private final InputStream inputStream; * @param <S> Response Type
*/
@Getter public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder; return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
private CallStreamObserver<Q> requestObserver; }
private Optional<Q> metaData = Optional.empty();
private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this(chunkBuilder, reqObserverBuilder, inputStream, true);
}
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, @Deprecated(since = "4.13.0")
InputStream inputStream, boolean completeOnFileSent) { public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
this.streamReader = new StreamReader(this.inputStream); FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
} Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
super(chunkBuilder, inputStream, reqObserverBuilder);
public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
this.metaData = Optional.of(metaData);
return this;
} }
@Override
public FileSender<Q, S> send() { public FileSender<Q, S> send() {
LOG.debug("Start sending File."); super.send();
var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
return this; return this;
} }
public void cancelOnTimeout() { @Override
LOG.warn("File transfer canceled on timeout"); public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
resultFuture.cancel(true); super.withMetaData(metaData);
requestObserver.onError(new TechnicalException("Timeout on waiting for upload.")); return this;
closeStreams();
}
public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t);
resultFuture.cancel(true);
requestObserver.onError(t);
closeStreams();
}
void sendNext() {
if (!done.get()) {
waitForOberver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (!done.get() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
}
private boolean isReady() {
return requestObserver.isReady();
}
private void waitForOberver() {
synchronized (this) {
while (Objects.isNull(requestObserver)) {
try {
LOG.debug("wait for observer");
wait(300);
} catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e);
Thread.currentThread().interrupt();
}
}
}
}
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
}
}
private void endTransfer() {
if (completeOnFileSent)
requestObserver.onCompleted();
else
sendEndOfFile();
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
requestObserver.onNext(chunk);
}
byte[] readFromStream() {
try {
return inputStream.readNBytes(CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on sending a single chunk", e);
}
}
void sendMetaData() {
metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
}
private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata.");
requestObserver.onNext(metadata);
metaDataSent.set(true);
}
void checkForEndOfStream(long sentSize) {
if (sentSize < CHUNK_SIZE) {
LOG.debug("File Transfer done. Closing stream.");
IOUtils.closeQuietly(inputStream);
requestObserver.onCompleted();
done.set(true);
} else {
LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
}
}
@RequiredArgsConstructor
private class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on reading a single chunk", e);
}
}
} }
} }
......
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Function;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.log4j.Log4j2;
@Log4j2
class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
private CallStreamObserver<Q> requestObserver;
StreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
super(chunkBuilder, inputStream);
this.reqObserverBuilder = reqObserverBuilder;
}
@Override
public StreamExclusiveFileSender<Q, S> send() {
LOG.debug("Start sending File.");
var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
return this;
}
@Override
void communicateEndOfTransfer() {
requestObserver.onCompleted();
}
@Override
protected CallStreamObserver<Q> getRequestObserver() {
return requestObserver;
}
@Override
public void cancelOnTimeout() {
super.cancelOnTimeout();
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
}
@Override
public void cancelOnError(Throwable t) {
super.cancelOnError(t);
requestObserver.onError(t);
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import io.grpc.stub.CallStreamObserver;
import lombok.extern.log4j.Log4j2;
@Log4j2
class StreamSharingFileSender<Q, S> extends StreamingFileSender<Q, S> {
private final CallStreamObserver<Q> requestObserver;
private final Consumer<Runnable> onReadyHandlerRegistrar;
StreamSharingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
super(chunkBuilder, inputStream);
this.requestObserver = requestObserver;
this.onReadyHandlerRegistrar = onReadyHandlerRegistrar;
}
@Override
public StreamSharingFileSender<Q, S> send() {
LOG.debug("Register onReadyHandler and start sending File.");
onReadyHandlerRegistrar.accept(this::sendNext);
return this;
}
@Override
void communicateEndOfTransfer() {
sendEndOfFile();
getResultFuture().complete(null);
}
private void sendEndOfFile() {
sendChunk(new byte[0], -1);
}
@Override
protected CallStreamObserver<Q> getRequestObserver() {
return requestObserver;
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.commons.io.IOUtils;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
/*
* Q = Request Type; S = Response Type
*/
@Log4j2
public abstract class StreamingFileSender<Q, S> {
static final int CHUNK_SIZE = 4 * 1024;
private final BiFunction<byte[], Integer, Q> chunkBuilder;
@Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private Q metaData;
private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
@Getter(AccessLevel.PROTECTED)
private final StreamReader streamReader;
StreamingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream) {
this.chunkBuilder = chunkBuilder;
this.streamReader = new StreamReader(inputStream);
}
public StreamingFileSender<Q, S> withMetaData(@NonNull Q metaData) {
this.metaData = metaData;
return this;
}
public abstract StreamingFileSender<Q, S> send();
public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout");
resultFuture.cancel(true);
closeStreamReader();
}
public void cancelOnError(Throwable t) {
LOG.error("File transfer canceled on error.", t);
resultFuture.cancel(true);
closeStreamReader();
}
void sendNext() {
if (notFinished()) {
waitForObserver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (notFinished() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
checkIfFinishedForcefully();
}
private boolean notFinished() {
return !done.get() && !resultFuture.isCancelled();
}
private void checkIfFinishedForcefully() {
if (resultFuture.isCancelled()) {
LOG.warn("File transfer was cancelled");
closeStreamReader();
}
}
private boolean isReady() {
return getRequestObserver().isReady();
}
private void waitForObserver() {
synchronized (this) {
while (Objects.isNull(getRequestObserver())) {
try {
LOG.debug("wait for observer");
wait(300);
} catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e);
Thread.currentThread().interrupt();
}
}
}
}
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
}
}
void endTransfer() {
communicateEndOfTransfer();
done.set(true);
LOG.debug("File Transfer done.");
closeStreamReader();
}
abstract void communicateEndOfTransfer();
void closeStreamReader() {
LOG.debug("Closing streams");
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
getRequestObserver().onNext(chunk);
}
void sendMetaData() {
if (metaData != null && !metaDataSent.get()) {
doSendMetaData(metaData);
}
}
private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata.");
getRequestObserver().onNext(metadata);
metaDataSent.set(true);
}
protected abstract CallStreamObserver<Q> getRequestObserver();
@RequiredArgsConstructor
protected static class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on reading a single chunk", e);
}
}
}
}
...@@ -24,289 +24,64 @@ ...@@ -24,289 +24,64 @@
package de.ozgcloud.common.binaryfile; package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType; import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver; import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
class GrpcFileUploadUtilsTest { class GrpcFileUploadUtilsTest {
@InjectMocks
private GrpcFileUploadUtils service;
@Mock @Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder; private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock @Mock
private InputStream inputStream;
@Mock
private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder; private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
@Mock @Mock
private CallStreamObserver<TestRequestType> requestObserver; private CallStreamObserver<TestRequestType> requestObserver;
@Mock @Mock
private InputStream inputStream; private Consumer<Runnable> onReadyHandlerRegistrar;
private FileSender<TestRequestType, TestResponseType> fileSender;
@Mock
private TestRequestType metaData;
@BeforeEach
void init() {
when(reqObserverBuilder.apply(any())).thenReturn(requestObserver);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder));
}
@Nested @Nested
class TestCreateFileSender { class TestCreateSender {
@Test @Test
void shouldCreateRequestObserver() { void shouldReturnInstanceOfStreamExclusiveFileSender() {
GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send(); var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
verify(reqObserverBuilder, atLeastOnce()).apply(notNull()); assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
} }
} }
@Nested @Nested
class TestSendBinaryFile { class TestCreateStreamExclusiveFileSender {
@Captor
private ArgumentCaptor<Runnable> runnableCaptor;
@Test @Test
void shouldReturnSenderWithFuture() { void shouldReturnInstanceOfStreamExclusiveFileSender() {
var result = fileSender.send(); var createdSender = GrpcFileUploadUtils.createStreamExclusiveFileSender(chunkBuilder, inputStream, reqObserverBuilder);
assertThat(result).isNotNull().extracting(FileSender::getResultFuture).isNotNull(); assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
} }
} }
@Nested @Nested
class TestSendNext { class TestCreateStreamSharingSender {
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test @Test
void shouldCallSendMetaData() { void shouldReturnInstanceOfStreamSharingSender() {
fileSender.sendNext(); var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
verify(fileSender).sendMetaData();
}
@Test
void shouldSendNextChunk() {
fileSender.sendNext();
verify(fileSender).sendNextChunk();
}
}
@Nested
class TestSendNextChunk {
private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
@Captor assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class);
private ArgumentCaptor<byte[]> chunkCaptor;
@Nested
class TestOnDataAvailable {
@BeforeEach
void initObserver() {
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
fileSender.send();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
assertThat(chunkCaptor.getValue()).contains(content);
}
} }
@Nested
class TestOnNoBytesLeftToRead {
@Nested
class TestOnCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = true;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver).onCompleted();
}
@Test
void shouldNotCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender, never()).sendChunk(any(), anyInt());
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
@Nested
class TestOnNotCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = false;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldNotCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver, never()).onCompleted();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
assertThat(chunkCaptor.getValue()).isEmpty();
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
}
} }
@Nested
class TestCloseStreams {
@Test
@SneakyThrows
void shouldCloseInputStream() {
fileSender.send();
fileSender.closeStreams();
verify(inputStream).close();
}
}
@Nested
class TestSendChunk {
private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldApplyBuildChunk() throws IOException {
fileSender.sendChunk(CHUNK_PART, 5);
verify(chunkBuilder).apply(CHUNK_PART, 5);
}
@Test
void shouldCallOnNext() throws IOException {
fileSender.sendChunk(CHUNK_PART, 5);
verify(requestObserver).onNext(any());
}
}
@Nested
class TestSendMetaData {
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldNotSendWithoutMetadata() {
fileSender.sendMetaData();
verify(requestObserver, never()).onNext(any());
}
@Test
void shouldSendMetadata() {
fileSender.withMetaData(metaData).sendMetaData();
verify(requestObserver).onNext(metaData);
}
@Test
void shouldSendMetadataOnlyOnce() {
fileSender.withMetaData(metaData).sendMetaData();
fileSender.sendMetaData();
verify(requestObserver).onNext(metaData);
}
}
@Disabled("unused")
@Nested
class TestReadFromStream {
@Test
void shouldThrowException() throws IOException {
doThrow(IOException.class).when(inputStream).read(any(), anyInt(), anyInt());
assertThatThrownBy(() -> fileSender.readFromStream()).isInstanceOf(TechnicalException.class);
}
}
} }
\ No newline at end of file
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.mockito.Mockito.*;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Spy;
import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
class StreamExclusiveFileSenderTest {
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
@Spy
@InjectMocks
private StreamExclusiveFileSender<TestRequestType, TestResponseType> fileSender;
@Nested
class TestSend {
private final CompletableFuture<TestResponseType> resultFuture = CompletableFuture.completedFuture(new TestResponseType());
@Captor
private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
@Mock
private BinaryFileUploadStreamObserver<TestRequestType, TestResponseType> responseObserver;
@BeforeEach
void init() {
doReturn(resultFuture).when(fileSender).getResultFuture();
}
@SuppressWarnings("rawtypes")
@Test
void shouldCreateResponseObserver() {
try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
doNothing().when(fileSender).sendNext();
fileSender.send();
mocked.verify(() -> BinaryFileUploadStreamObserver.create(same(resultFuture), onReadyHandlerCaptor.capture()));
verifyCallsSendNext(onReadyHandlerCaptor.getValue());
}
}
@SuppressWarnings("rawtypes")
@Test
void shouldBuildRequestObserver() {
try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
mocked.when(() -> BinaryFileUploadStreamObserver.create(any(), any())).thenReturn(responseObserver);
fileSender.send();
verify(reqObserverBuilder).apply(responseObserver);
}
}
private void verifyCallsSendNext(Runnable runnable) {
runnable.run();
verify(fileSender).sendNext();
}
}
@Nested
class TestCommunicateEndOfTransfer {
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@Test
void shouldCallOnCompleted() {
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
fileSender.communicateEndOfTransfer();
verify(requestObserver).onCompleted();
}
}
@Nested
class TestCancelOnTimeout {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnTimeout();
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnTimeout();
verify(requestObserver).onError(any(TechnicalException.class));
}
@Test
void shouldCloseStreamReader() {
fileSender.cancelOnTimeout();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCancelOnError {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
private final Throwable error = new Throwable();
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnError(error);
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnError(error);
verify(requestObserver).onError(error);
}
@Test
void shouldCloseStreamReader() {
fileSender.cancelOnError(error);
verify(fileSender).closeStreamReader();
}
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import io.grpc.stub.CallStreamObserver;
class StreamSharingFileSenderTest {
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@Mock
private Consumer<Runnable> onReadyHandlerRegistrar;
@Spy
@InjectMocks
private StreamSharingFileSender<TestRequestType, TestResponseType> fileSender;
@Nested
class TestSend {
@Captor
private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
@Test
void shouldRegisterOnReadyHandler() {
doNothing().when(fileSender).sendNext();
fileSender.send();
verify(onReadyHandlerRegistrar).accept(onReadyHandlerCaptor.capture());
verifyCallsSendNext(onReadyHandlerCaptor.getValue());
}
@Test
void shouldReturnThis() {
var obj = fileSender.send();
assertThat(obj).isSameAs(fileSender);
}
private void verifyCallsSendNext(Runnable runnable) {
runnable.run();
verify(fileSender).sendNext();
}
}
@Nested
class TestCommunicateEndOfTransfer {
@Test
void shouldSendEmptyChunk() {
fileSender.communicateEndOfTransfer();
verify(fileSender).sendChunk(new byte[0], -1);
}
@Test
void shouldCompleteResultFuture() {
fileSender.communicateEndOfTransfer();
assertThat(fileSender.getResultFuture().isDone()).isTrue();
}
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import io.grpc.stub.CallStreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.SneakyThrows;
class StreamingFileSenderTest {
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
private TestFileSender fileSender;
@BeforeEach
void init() {
fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver));
}
@Nested
class TestCancelOnTimeout {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach
void init() {
setResultFutureInFileSender(resultFuture);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnTimeout();
verify(resultFuture).cancel(true);
}
@Test
void shouldCloseStreams() {
fileSender.cancelOnTimeout();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCancelOnError {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
private final Throwable error = new Throwable();
@BeforeEach
void init() {
setResultFutureInFileSender(resultFuture);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnError(error);
verify(resultFuture).cancel(true);
}
@Test
void shouldCloseStreams() {
fileSender.cancelOnError(error);
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestSendNext {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach
void init() {
fileSender.send();
}
@Test
void shouldSendMetaData() {
fileSender.sendNext();
verify(fileSender).sendMetaData();
}
@Test
void shouldSendNextChunk() {
fileSender.sendNext();
verify(fileSender).sendNextChunk();
}
@Test
void shouldNotSendMetaDataIfDone() {
setDoneInFileSender(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldNotSendMetaDataIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldSendNextChunkUntilDone() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
setDoneInFileSender(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void shouldSendNextChunkUntilCancelled() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
fileSender.getResultFuture().cancel(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void closeStreamReaderIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestSendNextChunk {
private final byte[] content = RandomUtils.insecure().randomBytes(StreamingFileSender.CHUNK_SIZE / 2);
private final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
@Captor
private ArgumentCaptor<byte[]> chunkCaptor;
@Nested
class TestOnDataAvailable {
@BeforeEach
void init() {
fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
}
@Test
void shouldSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
assertThat(chunkCaptor.getValue()).contains(content);
}
}
@Nested
class TestOnNoBytesLeftToRead {
@BeforeEach
void init() {
var buffer = new byte[StreamingFileSender.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, StreamingFileSender.CHUNK_SIZE);
fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
}
@Test
void shouldNotSendChunk() {
fileSender.sendNextChunk();
verify(fileSender, never()).sendChunk(any(), anyInt());
}
@Test
void shouldEndTransfer() {
fileSender.sendNextChunk();
verify(fileSender).endTransfer();
}
}
}
@Nested
class TestEndTransfer {
@Test
void shouldCommunicateEndOfTransfer() {
fileSender.endTransfer();
verify(fileSender).communicateEndOfTransfer();
}
@Test
void shouldSetDoneToTrue() {
fileSender.endTransfer();
fileSender.sendNext();
verify(fileSender, never()).sendNextChunk();
}
@Test
void shouldCloseStreamReader() {
fileSender.endTransfer();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCloseStreamReader {
@Test
@SneakyThrows
void shouldCloseInputStream() {
fileSender.closeStreamReader();
verify(inputStream).close();
}
}
@Nested
class TestSendChunk {
private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldApplyBuildChunk() {
fileSender.sendChunk(CHUNK_PART, 5);
verify(chunkBuilder).apply(CHUNK_PART, 5);
}
@Test
void shouldCallOnNext() {
fileSender.sendChunk(CHUNK_PART, 5);
verify(requestObserver).onNext(any());
}
}
@Nested
class TestSendMetaData {
@Mock
private TestRequestType metaData;
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldNotSendWithoutMetadata() {
fileSender.sendMetaData();
verify(requestObserver, never()).onNext(any());
}
@Test
void shouldSendMetadata() {
fileSender.withMetaData(metaData).sendMetaData();
verify(requestObserver).onNext(metaData);
}
@Test
void shouldSendMetadataOnlyOnce() {
fileSender.withMetaData(metaData).sendMetaData();
fileSender.sendMetaData();
verify(requestObserver).onNext(metaData);
}
}
private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
}
private void setDoneInFileSender(boolean done) {
((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
}
static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
@Getter(AccessLevel.PROTECTED)
private final CallStreamObserver<TestRequestType> requestObserver;
TestFileSender(BiFunction<byte[], Integer, TestRequestType> chunkBuilder, InputStream inputStream,
CallStreamObserver<TestRequestType> requestObserver) {
super(chunkBuilder, inputStream);
this.requestObserver = requestObserver;
}
@Override
public StreamingFileSender<TestRequestType, TestResponseType> send() {
return this;
}
@Override
void communicateEndOfTransfer() {
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment