Skip to content
Snippets Groups Projects

OZG-7573 allow multi file upload by not closing requestObserver on EOF

Merged Felix Reichenbach requested to merge OZG-7573-GrpcUpload-utils into main
3 files
+ 152
4
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -55,7 +55,12 @@ public class GrpcFileUploadUtils {
@@ -55,7 +55,12 @@ public class GrpcFileUploadUtils {
*/
*/
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream);
return createSender(chunkBuilder, inputStream, reqObserverBuilder, true);
 
}
 
 
public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
 
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, boolean completeOnFileSent) {
 
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream, completeOnFileSent);
}
}
public static class FileSender<Q, S> {
public static class FileSender<Q, S> {
@@ -72,12 +77,19 @@ public class GrpcFileUploadUtils {
@@ -72,12 +77,19 @@ public class GrpcFileUploadUtils {
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final StreamReader streamReader;
private final StreamReader streamReader;
 
private final boolean completeOnFileSent;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
InputStream inputStream) {
 
this(chunkBuilder, reqObserverBuilder, inputStream, true);
 
}
 
 
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
 
InputStream inputStream, boolean completeOnFileSent) {
this.chunkBuilder = chunkBuilder;
this.chunkBuilder = chunkBuilder;
this.inputStream = inputStream;
this.inputStream = inputStream;
this.reqObserverBuilder = reqObserverBuilder;
this.reqObserverBuilder = reqObserverBuilder;
 
this.completeOnFileSent = completeOnFileSent;
this.streamReader = new StreamReader(this.inputStream);
this.streamReader = new StreamReader(this.inputStream);
}
}
@@ -152,16 +164,22 @@ public class GrpcFileUploadUtils {
@@ -152,16 +164,22 @@ public class GrpcFileUploadUtils {
}
}
private void endTransfer() {
private void endTransfer() {
requestObserver.onCompleted();
if (completeOnFileSent)
 
requestObserver.onCompleted();
 
else
 
sendEndOfFile();
done.set(true);
done.set(true);
LOG.debug("File Transfer done.");
LOG.debug("File Transfer done.");
closeStreams();
closeStreams();
}
}
private void closeStreams() {
private void sendEndOfFile() {
 
sendChunk(new byte[0], streamReader.getLastReadSize());
 
}
 
 
void closeStreams() {
LOG.debug("Closing streams");
LOG.debug("Closing streams");
IOUtils.closeQuietly(inputStream);
streamReader.close();
streamReader.close();
}
}
Loading