Skip to content
Snippets Groups Projects
Commit b7ed2fbf authored by Felix Reichenbach's avatar Felix Reichenbach
Browse files

Merge branch 'OZG-7573-GrpcUpload-utils' into 'main'

OZG-7573 allow multi file upload by not closing requestObserver on EOF

See merge request !11
parents 1bdc397d 6f11b70f
Branches
Tags
1 merge request!11OZG-7573 allow multi file upload by not closing requestObserver on EOF
......@@ -10,3 +10,4 @@ target
.idea
*.iml
.vscode/settings.json
......@@ -55,7 +55,12 @@ public class GrpcFileUploadUtils {
*/
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream);
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
}
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
}
public static class FileSender<Q, S> {
......@@ -72,12 +77,19 @@ public class GrpcFileUploadUtils {
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this(chunkBuilder, reqObserverBuilder, inputStream, true);
}
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
this.streamReader = new StreamReader(this.inputStream);
}
......@@ -140,7 +152,7 @@ public class GrpcFileUploadUtils {
}
long sendNextChunk() {
void sendNextChunk() {
byte[] contentToSend = streamReader.getNextData();
if (streamReader.getLastReadSize() > 0) {
......@@ -148,20 +160,25 @@ public class GrpcFileUploadUtils {
} else {
endTransfer();
}
return contentToSend.length;
}
private void endTransfer() {
requestObserver.onCompleted();
if (completeOnFileSent)
requestObserver.onCompleted();
else
sendEndOfFile();
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void closeStreams() {
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
IOUtils.closeQuietly(inputStream);
streamReader.close();
}
......
......@@ -27,11 +27,13 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
......@@ -47,6 +49,7 @@ import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
class GrpcFileUploadUtilsTest {
......@@ -122,6 +125,123 @@ class GrpcFileUploadUtilsTest {
}
@Nested
class TestSendNextChunk {
private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
@Captor
private ArgumentCaptor<byte[]> chunkCaptor;
@Nested
class TestOnDataAvailable {
@BeforeEach
void initObserver() {
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
fileSender.send();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
assertThat(chunkCaptor.getValue()).contains(content);
}
}
@Nested
class TestOnNoBytesLeftToRead {
@Nested
class TestOnCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = true;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver).onCompleted();
}
@Test
void shouldNotCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender, never()).sendChunk(any(), anyInt());
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
@Nested
class TestOnNotCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = false;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldNotCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver, never()).onCompleted();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
assertThat(chunkCaptor.getValue()).isEmpty();
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
}
}
@Nested
class TestCloseStreams {
@Test
@SneakyThrows
void shouldCloseInputStream() {
fileSender.send();
fileSender.closeStreams();
verify(inputStream).close();
}
}
@Nested
class TestSendChunk {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment