Skip to content
Snippets Groups Projects
Commit 913f11af authored by Felix Reichenbach's avatar Felix Reichenbach
Browse files

OZG-7573 allow multi file upload by not closing requestObserver on EOF

parent 1bdc397d
Branches
Tags
1 merge request!11OZG-7573 allow multi file upload by not closing requestObserver on EOF
This commit is part of merge request !11. Comments created here will be created in the context of that merge request.
......@@ -10,3 +10,4 @@ target
.idea
*.iml
.vscode/settings.json
......@@ -55,7 +55,12 @@ public class GrpcFileUploadUtils {
*/
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream);
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
}
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
}
public static class FileSender<Q, S> {
......@@ -72,12 +77,19 @@ public class GrpcFileUploadUtils {
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this(chunkBuilder, reqObserverBuilder, inputStream, true);
}
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
this.streamReader = new StreamReader(this.inputStream);
}
......@@ -152,16 +164,22 @@ public class GrpcFileUploadUtils {
}
private void endTransfer() {
if (completeOnFileSent)
requestObserver.onCompleted();
else
sendEndOfFile();
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void closeStreams() {
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
IOUtils.closeQuietly(inputStream);
streamReader.close();
}
......
......@@ -27,11 +27,13 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
......@@ -47,6 +49,7 @@ import de.ozgcloud.common.binaryfile.GrpcFileUploadUtils.FileSender;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
class GrpcFileUploadUtilsTest {
......@@ -122,6 +125,132 @@ class GrpcFileUploadUtilsTest {
}
@Nested
class TestSendNextChunk {
private final byte[] content = RandomUtils.insecure().randomBytes(GrpcFileUploadUtils.CHUNK_SIZE / 2);
private ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
@Captor
private ArgumentCaptor<byte[]> chunkCaptor;
@Nested
class TestOnDataAvailable {
@BeforeEach
void initObserver() {
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder));
fileSender.send();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(content.length));
assertThat(chunkCaptor.getValue()).contains(content);
}
@Test
void shouldReturnChunkSize() {
var cotnentLength = fileSender.sendNextChunk();
assertThat(cotnentLength).isEqualTo(GrpcFileUploadUtils.CHUNK_SIZE);
}
}
@Nested
class TestOnNoBytesLeftToRead {
@Nested
class TestOnCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = true;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver).onCompleted();
}
@Test
void shouldNotCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender, never()).sendChunk(any(), anyInt());
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
@Nested
class TestOnNotCompleteOnFileSent {
private static final boolean COMPLETE_ON_FILE_SENT = false;
@BeforeEach
void initialize() {
var buffer = new byte[GrpcFileUploadUtils.CHUNK_SIZE];
byteArrayInputStream.read(buffer, 0, GrpcFileUploadUtils.CHUNK_SIZE);
fileSender = spy(GrpcFileUploadUtils.createSender(chunkBuilder, byteArrayInputStream, reqObserverBuilder, COMPLETE_ON_FILE_SENT));
fileSender.send();
}
@Test
void shouldNotCallOnCompleted() {
fileSender.sendNextChunk();
verify(requestObserver, never()).onCompleted();
}
@Test
void shouldCallSendChunk() {
fileSender.sendNextChunk();
verify(fileSender).sendChunk(chunkCaptor.capture(), eq(-1));
assertThat(chunkCaptor.getValue()).isEmpty();
}
@Test
@SneakyThrows
void shouldCallCloseStreams() {
fileSender.sendNextChunk();
verify(fileSender).closeStreams();
}
}
}
}
@Nested
class TestCloseStreams {
@Test
@SneakyThrows
void shouldCloseInputStream() {
fileSender.send();
fileSender.closeStreams();
verify(inputStream).close();
}
}
@Nested
class TestSendChunk {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment