Skip to content
Snippets Groups Projects

OZG-7573 allow multi file upload by not closing requestObserver on EOF

Merged Felix Reichenbach requested to merge OZG-7573-GrpcUpload-utils into main
3 files
+ 152
4
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -55,7 +55,12 @@ public class GrpcFileUploadUtils {
*/
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream);
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
}
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
}
public static class FileSender<Q, S> {
@@ -72,12 +77,19 @@ public class GrpcFileUploadUtils {
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this(chunkBuilder, reqObserverBuilder, inputStream, true);
}
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.completeOnFileSent = completeOnFileSent;
this.streamReader = new StreamReader(this.inputStream);
}
@@ -152,16 +164,22 @@ public class GrpcFileUploadUtils {
}
private void endTransfer() {
requestObserver.onCompleted();
if (completeOnFileSent)
requestObserver.onCompleted();
else
sendEndOfFile();
done.set(true);
LOG.debug("File Transfer done.");
closeStreams();
}
private void closeStreams() {
private void sendEndOfFile() {
sendChunk(new byte[0], streamReader.getLastReadSize());
}
void closeStreams() {
LOG.debug("Closing streams");
IOUtils.closeQuietly(inputStream);
streamReader.close();
}
Loading