Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ozg-cloud/lib/common-lib
1 result
Show changes
Commits on Source (13)
Showing
with 1071 additions and 430 deletions
......@@ -23,225 +23,69 @@
*/
package de.ozgcloud.common.binaryfile;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class GrpcFileUploadUtils {
static final int CHUNK_SIZE = 4 * 1024;
/*
* Q = Request Type; S = Response Type
/**
* @param <Q> Request Type
* @param <S> Response Type
* @deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead
*/
@Deprecated(since = "4.13.0")
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
return new FileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
}
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
/**
* @param <Q> Request Type
* @param <S> Response Type
*/
public static <Q, S> StreamingFileSender<Q, S> createStreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new StreamExclusiveFileSender<>(chunkBuilder, inputStream, reqObserverBuilder);
}
public static class FileSender<Q, S> {
private final BiFunction<byte[], Integer, Q> chunkBuilder;
private final InputStream inputStream;
@Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
private CallStreamObserver<Q> requestObserver;
private Optional<Q> metaData = Optional.empty();
private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this(chunkBuilder, reqObserverBuilder, inputStream, true);
}
/**
* @param <Q> Request Type
* @param <S> Response Type
*/
public static <Q, S> StreamingFileSender<Q, S> createStreamSharingSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
return new StreamSharingFileSender<>(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
}
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
@Deprecated(since = "4.13.0")
public static class FileSender<Q, S> extends StreamExclusiveFileSender<Q, S> {
this.streamReader = new StreamReader(this.inputStream);
}
public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
this.metaData = Optional.of(metaData);
return this;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
super(chunkBuilder, inputStream, reqObserverBuilder);
}
@Override
public FileSender<Q, S> send() {
LOG.debug("Start sending File.");
var responseObserver = BinaryFileUploadStreamObserver.create(resultFuture, this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
super.send();
return this;
}
public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout");
resultFuture.cancel(true);
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
closeStreams();
}
public void cancelOnError(Throwable t) {
LOG.error("File tranfer canceled on error.", t);
resultFuture.cancel(true);
requestObserver.onError(t);
closeStreams();
}
void sendNext() {
if (!done.get()) {
waitForOberver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (!done.get() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
}
private boolean isReady() {
return requestObserver.isReady();
}
private void waitForOberver() {
synchronized (this) {
while (Objects.isNull(requestObserver)) {
try {
LOG.debug("wait for observer");
wait(300);
} catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e);
Thread.currentThread().interrupt();
}
}
}
}
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
}
}
private void endTransfer() {
if (completeOnFileSent)
requestObserver.onCompleted();
else
sendEndOfFile();
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
requestObserver.onNext(chunk);
}
byte[] readFromStream() {
try {
return inputStream.readNBytes(CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on sending a single chunk", e);
}
}
void sendMetaData() {
metaData.filter(md -> !metaDataSent.get()).ifPresent(this::doSendMetaData);
}
private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata.");
requestObserver.onNext(metadata);
metaDataSent.set(true);
}
void checkForEndOfStream(long sentSize) {
if (sentSize < CHUNK_SIZE) {
LOG.debug("File Transfer done. Closing stream.");
IOUtils.closeQuietly(inputStream);
requestObserver.onCompleted();
done.set(true);
} else {
LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
}
}
@RequiredArgsConstructor
private class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on reading a single chunk", e);
}
}
@Override
public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
super.withMetaData(metaData);
return this;
}
}
......
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Function;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.log4j.Log4j2;
@Log4j2
class StreamExclusiveFileSender<Q, S> extends StreamingFileSender<Q, S> {
private final Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder;
private CallStreamObserver<Q> requestObserver;
StreamExclusiveFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
super(chunkBuilder, inputStream);
this.reqObserverBuilder = reqObserverBuilder;
}
@Override
public StreamExclusiveFileSender<Q, S> send() {
LOG.debug("Start sending File.");
var responseObserver = BinaryFileUploadStreamObserver.create(getResultFuture(), this::sendNext);
requestObserver = reqObserverBuilder.apply(responseObserver);
return this;
}
@Override
void communicateEndOfTransfer() {
requestObserver.onCompleted();
}
@Override
protected CallStreamObserver<Q> getRequestObserver() {
return requestObserver;
}
@Override
public void cancelOnTimeout() {
super.cancelOnTimeout();
requestObserver.onError(new TechnicalException("Timeout on waiting for upload."));
}
@Override
public void cancelOnError(Throwable t) {
super.cancelOnError(t);
requestObserver.onError(t);
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import io.grpc.stub.CallStreamObserver;
import lombok.extern.log4j.Log4j2;
@Log4j2
class StreamSharingFileSender<Q, S> extends StreamingFileSender<Q, S> {
private final CallStreamObserver<Q> requestObserver;
private final Consumer<Runnable> onReadyHandlerRegistrar;
StreamSharingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, CallStreamObserver<Q> requestObserver, Consumer<Runnable> onReadyHandlerRegistrar) {
super(chunkBuilder, inputStream);
this.requestObserver = requestObserver;
this.onReadyHandlerRegistrar = onReadyHandlerRegistrar;
}
@Override
public StreamSharingFileSender<Q, S> send() {
LOG.debug("Register onReadyHandler and start sending File.");
onReadyHandlerRegistrar.accept(this::sendNext);
return this;
}
@Override
void communicateEndOfTransfer() {
sendEndOfFile();
getResultFuture().complete(null);
}
private void sendEndOfFile() {
sendChunk(new byte[0], -1);
}
@Override
protected CallStreamObserver<Q> getRequestObserver() {
return requestObserver;
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.commons.io.IOUtils;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
/*
* Q = Request Type; S = Response Type
*/
@Log4j2
public abstract class StreamingFileSender<Q, S> {
static final int CHUNK_SIZE = 4 * 1024;
private final BiFunction<byte[], Integer, Q> chunkBuilder;
@Getter
private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
private Q metaData;
private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
@Getter(AccessLevel.PROTECTED)
private final StreamReader streamReader;
StreamingFileSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream) {
this.chunkBuilder = chunkBuilder;
this.streamReader = new StreamReader(inputStream);
}
public StreamingFileSender<Q, S> withMetaData(@NonNull Q metaData) {
this.metaData = metaData;
return this;
}
public abstract StreamingFileSender<Q, S> send();
public void cancelOnTimeout() {
LOG.warn("File transfer canceled on timeout");
resultFuture.cancel(true);
closeStreamReader();
}
public void cancelOnError(Throwable t) {
LOG.error("File transfer canceled on error.", t);
resultFuture.cancel(true);
closeStreamReader();
}
void sendNext() {
if (notFinished()) {
waitForObserver();
sendMetaData();
do {
LOG.debug("Sending next chunk.");
sendNextChunk();
} while (notFinished() && isReady());
LOG.debug("Finished or waiting to become ready.");
}
checkIfFinishedForcefully();
}
private boolean notFinished() {
return !done.get() && !resultFuture.isCancelled();
}
private void checkIfFinishedForcefully() {
if (resultFuture.isCancelled()) {
LOG.warn("File transfer was cancelled");
closeStreamReader();
}
}
private boolean isReady() {
return getRequestObserver().isReady();
}
private void waitForObserver() {
synchronized (this) {
while (Objects.isNull(getRequestObserver())) {
try {
LOG.debug("wait for observer");
wait(300);
} catch (InterruptedException e) {
LOG.error("Error on waiting for request Observer.", e);
Thread.currentThread().interrupt();
}
}
}
}
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
}
}
void endTransfer() {
communicateEndOfTransfer();
done.set(true);
LOG.debug("File Transfer done.");
closeStreamReader();
}
abstract void communicateEndOfTransfer();
void closeStreamReader() {
LOG.debug("Closing streams");
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
getRequestObserver().onNext(chunk);
}
void sendMetaData() {
if (metaData != null && !metaDataSent.get()) {
doSendMetaData(metaData);
}
}
private void doSendMetaData(Q metadata) {
LOG.debug("Sending Metadata.");
getRequestObserver().onNext(metadata);
metaDataSent.set(true);
}
protected abstract CallStreamObserver<Q> getRequestObserver();
@RequiredArgsConstructor
protected static class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on reading a single chunk", e);
}
}
}
}
......@@ -24,289 +24,64 @@
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
class GrpcFileUploadUtilsTest {
@InjectMocks
private GrpcFileUploadUtils service;
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@Mock
private InputStream inputStream;
private FileSender<TestRequestType, TestResponseType> fileSender;
@Mock
private TestRequestType metaData;
@BeforeEach
void init() {
when(reqObserverBuilder.apply(any())).thenReturn(requestObserver);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder));
}
private Consumer<Runnable> onReadyHandlerRegistrar;
@Nested
class TestCreateFileSender {
class TestCreateSender {
@Test
void shouldCreateRequestObserver() {
GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder).send();
void shouldReturnInstanceOfStreamExclusiveFileSender() {
var createdSender = GrpcFileUploadUtils.createSender(chunkBuilder, inputStream, reqObserverBuilder);
verify(reqObserverBuilder, atLeastOnce()).apply(notNull());
assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
}
}
@Nested
class TestSendBinaryFile {
@Captor
private ArgumentCaptor<Runnable> runnableCaptor;
class TestCreateStreamExclusiveFileSender {
@Test
void shouldReturnSenderWithFuture() {
var result = fileSender.send();
void shouldReturnInstanceOfStreamExclusiveFileSender() {
var createdSender = GrpcFileUploadUtils.createStreamExclusiveFileSender(chunkBuilder, inputStream, reqObserverBuilder);
assertThat(result).isNotNull().extracting(FileSender::getResultFuture).isNotNull();
assertThat(createdSender).isInstanceOf(StreamExclusiveFileSender.class);
}
}
@Nested
class TestSendNext {
@BeforeEach
void initObserver() {
fileSender.send();
}
class TestCreateStreamSharingSender {
@Test
void shouldCallSendMetaData() {
fileSender.sendNext();
verify(fileSender).sendMetaData();
}
@Test
void shouldSendNextChunk() {
fileSender.sendNext();
verify(fileSender).sendNextChunk();
}
}
@Nested
class TestSendNextChunk {
private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
void shouldReturnInstanceOfStreamSharingSender() {
var createdSender = GrpcFileUploadUtils.createStreamSharingSender(chunkBuilder, inputStream, requestObserver, onReadyHandlerRegistrar);
@Captor
private ArgumentCaptor<byte[]> chunkCaptor;
@Nested
class TestOnDataAvailable {
@BeforeEach
void initObserver() {
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
fileSender.send();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
assertThat(chunkCaptor.getValue()).contains(content);
}
assertThat(createdSender).isInstanceOf(StreamSharingFileSender.class);
}
@Nested
class TestOnNoBytesLeftToRead {
@Nested
class TestOnCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = true;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver).onCompleted();
}
@Test
void shouldNotCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender, never()).sendChunk(any(), anyInt());
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
@Nested
class TestOnNotCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = false;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldNotCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver, never()).onCompleted();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
assertThat(chunkCaptor.getValue()).isEmpty();
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
}
}
@Nested
class TestCloseStreams {
@Test
@SneakyThrows
void shouldCloseInputStream() {
fileSender.send();
fileSender.closeStreams();
verify(inputStream).close();
}
}
@Nested
class TestSendChunk {
private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldApplyBuildChunk() throws IOException {
fileSender.sendChunk(CHUNK_PART, 5);
verify(chunkBuilder).apply(CHUNK_PART, 5);
}
@Test
void shouldCallOnNext() throws IOException {
fileSender.sendChunk(CHUNK_PART, 5);
verify(requestObserver).onNext(any());
}
}
@Nested
class TestSendMetaData {
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldNotSendWithoutMetadata() {
fileSender.sendMetaData();
verify(requestObserver, never()).onNext(any());
}
@Test
void shouldSendMetadata() {
fileSender.withMetaData(metaData).sendMetaData();
verify(requestObserver).onNext(metaData);
}
@Test
void shouldSendMetadataOnlyOnce() {
fileSender.withMetaData(metaData).sendMetaData();
fileSender.sendMetaData();
verify(requestObserver).onNext(metaData);
}
}
@Disabled("unused")
@Nested
class TestReadFromStream {
@Test
void shouldThrowException() throws IOException {
doThrow(IOException.class).when(inputStream).read(any(), anyInt(), anyInt());
assertThatThrownBy(() -> fileSender.readFromStream()).isInstanceOf(TechnicalException.class);
}
}
}
\ No newline at end of file
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.mockito.Mockito.*;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Spy;
import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
class StreamExclusiveFileSenderTest {
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private Function<StreamObserver<TestResponseType>, CallStreamObserver<TestRequestType>> reqObserverBuilder;
@Spy
@InjectMocks
private StreamExclusiveFileSender<TestRequestType, TestResponseType> fileSender;
@Nested
class TestSend {
private final CompletableFuture<TestResponseType> resultFuture = CompletableFuture.completedFuture(new TestResponseType());
@Captor
private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
@Mock
private BinaryFileUploadStreamObserver<TestRequestType, TestResponseType> responseObserver;
@BeforeEach
void init() {
doReturn(resultFuture).when(fileSender).getResultFuture();
}
@SuppressWarnings("rawtypes")
@Test
void shouldCreateResponseObserver() {
try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
doNothing().when(fileSender).sendNext();
fileSender.send();
mocked.verify(() -> BinaryFileUploadStreamObserver.create(same(resultFuture), onReadyHandlerCaptor.capture()));
verifyCallsSendNext(onReadyHandlerCaptor.getValue());
}
}
@SuppressWarnings("rawtypes")
@Test
void shouldBuildRequestObserver() {
try (MockedStatic<BinaryFileUploadStreamObserver> mocked = mockStatic(BinaryFileUploadStreamObserver.class)) {
mocked.when(() -> BinaryFileUploadStreamObserver.create(any(), any())).thenReturn(responseObserver);
fileSender.send();
verify(reqObserverBuilder).apply(responseObserver);
}
}
private void verifyCallsSendNext(Runnable runnable) {
runnable.run();
verify(fileSender).sendNext();
}
}
@Nested
class TestCommunicateEndOfTransfer {
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@Test
void shouldCallOnCompleted() {
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
fileSender.communicateEndOfTransfer();
verify(requestObserver).onCompleted();
}
}
@Nested
class TestCancelOnTimeout {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnTimeout();
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnTimeout();
verify(requestObserver).onError(any(TechnicalException.class));
}
@Test
void shouldCloseStreamReader() {
fileSender.cancelOnTimeout();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCancelOnError {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
private final Throwable error = new Throwable();
@BeforeEach
void init() {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
ReflectionTestUtils.setField(fileSender, "requestObserver", requestObserver);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnError(error);
verify(resultFuture).cancel(true);
}
@Test
void shouldCallOnError() {
fileSender.cancelOnError(error);
verify(requestObserver).onError(error);
}
@Test
void shouldCloseStreamReader() {
fileSender.cancelOnError(error);
verify(fileSender).closeStreamReader();
}
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import io.grpc.stub.CallStreamObserver;
class StreamSharingFileSenderTest {
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
@Mock
private Consumer<Runnable> onReadyHandlerRegistrar;
@Spy
@InjectMocks
private StreamSharingFileSender<TestRequestType, TestResponseType> fileSender;
@Nested
class TestSend {
@Captor
private ArgumentCaptor<Runnable> onReadyHandlerCaptor;
@Test
void shouldRegisterOnReadyHandler() {
doNothing().when(fileSender).sendNext();
fileSender.send();
verify(onReadyHandlerRegistrar).accept(onReadyHandlerCaptor.capture());
verifyCallsSendNext(onReadyHandlerCaptor.getValue());
}
@Test
void shouldReturnThis() {
var obj = fileSender.send();
assertThat(obj).isSameAs(fileSender);
}
private void verifyCallsSendNext(Runnable runnable) {
runnable.run();
verify(fileSender).sendNext();
}
}
@Nested
class TestCommunicateEndOfTransfer {
@Test
void shouldSendEmptyChunk() {
fileSender.communicateEndOfTransfer();
verify(fileSender).sendChunk(new byte[0], -1);
}
@Test
void shouldCompleteResultFuture() {
fileSender.communicateEndOfTransfer();
assertThat(fileSender.getResultFuture().isDone()).isTrue();
}
}
}
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package de.ozgcloud.common.binaryfile;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestRequestType;
import de.ozgcloud.common.binaryfile.BinaryFileTestFactory.TestResponseType;
import io.grpc.stub.CallStreamObserver;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.SneakyThrows;
class StreamingFileSenderTest {
@Mock
private BiFunction<byte[], Integer, TestRequestType> chunkBuilder;
@Mock
private InputStream inputStream;
@Mock
private CallStreamObserver<TestRequestType> requestObserver;
private TestFileSender fileSender;
@BeforeEach
void init() {
fileSender = spy(new TestFileSender(chunkBuilder, inputStream, requestObserver));
}
@Nested
class TestCancelOnTimeout {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach
void init() {
setResultFutureInFileSender(resultFuture);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnTimeout();
verify(resultFuture).cancel(true);
}
@Test
void shouldCloseStreams() {
fileSender.cancelOnTimeout();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCancelOnError {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
private final Throwable error = new Throwable();
@BeforeEach
void init() {
setResultFutureInFileSender(resultFuture);
}
@Test
void shouldCancelResultFuture() {
fileSender.cancelOnError(error);
verify(resultFuture).cancel(true);
}
@Test
void shouldCloseStreams() {
fileSender.cancelOnError(error);
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestSendNext {
@Mock
private CompletableFuture<TestResponseType> resultFuture;
@BeforeEach
void init() {
fileSender.send();
}
@Test
void shouldSendMetaData() {
fileSender.sendNext();
verify(fileSender).sendMetaData();
}
@Test
void shouldSendNextChunk() {
fileSender.sendNext();
verify(fileSender).sendNextChunk();
}
@Test
void shouldNotSendMetaDataIfDone() {
setDoneInFileSender(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldNotSendMetaDataIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender, never()).sendMetaData();
}
@Test
void shouldSendNextChunkUntilDone() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
setDoneInFileSender(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void shouldSendNextChunkUntilCancelled() {
lenient().when(requestObserver.isReady()).thenReturn(true);
doAnswer(invocation -> {
fileSender.getResultFuture().cancel(true);
return null;
}).when(fileSender).sendNextChunk();
fileSender.sendNext();
verify(fileSender, times(1)).sendNextChunk();
}
@Test
void closeStreamReaderIfCancelled() {
fileSender.getResultFuture().cancel(true);
fileSender.sendNext();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestSendNextChunk {
private final byte[] content = RandomUtils.insecure().randomBytes(StreamingFileSender.CHUNK_SIZE / 2);
private final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
@Captor
private ArgumentCaptor<byte[]> chunkCaptor;
@Nested
class TestOnDataAvailable {
@BeforeEach
void init() {
fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
}
@Test
void shouldSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
assertThat(chunkCaptor.getValue()).contains(content);
}
}
@Nested
class TestOnNoBytesLeftToRead {
@BeforeEach
void init() {
var buffer = new byte[StreamingFileSender.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, StreamingFileSender.CHUNK_SIZE);
fileSender = spy(new TestFileSender(chunkBuilder, byteArrayInputStream, requestObserver));
}
@Test
void shouldNotSendChunk() {
fileSender.sendNextChunk();
verify(fileSender, never()).sendChunk(any(), anyInt());
}
@Test
void shouldEndTransfer() {
fileSender.sendNextChunk();
verify(fileSender).endTransfer();
}
}
}
@Nested
class TestEndTransfer {
@Test
void shouldCommunicateEndOfTransfer() {
fileSender.endTransfer();
verify(fileSender).communicateEndOfTransfer();
}
@Test
void shouldSetDoneToTrue() {
fileSender.endTransfer();
fileSender.sendNext();
verify(fileSender, never()).sendNextChunk();
}
@Test
void shouldCloseStreamReader() {
fileSender.endTransfer();
verify(fileSender).closeStreamReader();
}
}
@Nested
class TestCloseStreamReader {
@Test
@SneakyThrows
void shouldCloseInputStream() {
fileSender.closeStreamReader();
verify(inputStream).close();
}
}
@Nested
class TestSendChunk {
private static final byte[] CHUNK_PART = "ChunkPartContent".getBytes();
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldApplyBuildChunk() {
fileSender.sendChunk(CHUNK_PART, 5);
verify(chunkBuilder).apply(CHUNK_PART, 5);
}
@Test
void shouldCallOnNext() {
fileSender.sendChunk(CHUNK_PART, 5);
verify(requestObserver).onNext(any());
}
}
@Nested
class TestSendMetaData {
@Mock
private TestRequestType metaData;
@BeforeEach
void initObserver() {
fileSender.send();
}
@Test
void shouldNotSendWithoutMetadata() {
fileSender.sendMetaData();
verify(requestObserver, never()).onNext(any());
}
@Test
void shouldSendMetadata() {
fileSender.withMetaData(metaData).sendMetaData();
verify(requestObserver).onNext(metaData);
}
@Test
void shouldSendMetadataOnlyOnce() {
fileSender.withMetaData(metaData).sendMetaData();
fileSender.sendMetaData();
verify(requestObserver).onNext(metaData);
}
}
private void setResultFutureInFileSender(CompletableFuture<TestResponseType> resultFuture) {
ReflectionTestUtils.setField(fileSender, "resultFuture", resultFuture);
}
private void setDoneInFileSender(boolean done) {
((AtomicBoolean) ReflectionTestUtils.getField(fileSender, null, "done")).set(done);
}
static class TestFileSender extends StreamingFileSender<TestRequestType, TestResponseType> {
@Getter(AccessLevel.PROTECTED)
private final CallStreamObserver<TestRequestType> requestObserver;
TestFileSender(BiFunction<byte[], Integer, TestRequestType> chunkBuilder, InputStream inputStream,
CallStreamObserver<TestRequestType> requestObserver) {
super(chunkBuilder, inputStream);
this.requestObserver = requestObserver;
}
@Override
public StreamingFileSender<TestRequestType, TestResponseType> send() {
return this;
}
@Override
void communicateEndOfTransfer() {
}
}
}