Skip to content
Snippets Groups Projects
Commit 4230f9cc authored by OZGCloud's avatar OZGCloud
Browse files

fix try: save memory by reusing buffer array

parent bf726842
Branches
Tags
No related merge requests found
...@@ -29,6 +29,7 @@ import java.util.Objects; ...@@ -29,6 +29,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
...@@ -37,8 +38,10 @@ import de.itvsh.kop.common.errorhandling.TechnicalException; ...@@ -37,8 +38,10 @@ import de.itvsh.kop.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver; import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
...@@ -50,13 +53,13 @@ public class GrpcFileUploadUtils { ...@@ -50,13 +53,13 @@ public class GrpcFileUploadUtils {
/* /*
* Q = Request Type; S = Response Type * Q = Request Type; S = Response Type
*/ */
public static <Q, S> FileSender<Q, S> createSender(Function<byte[], Q> chunkBuilder, InputStream inputStream, public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream,
Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) {
return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream); return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream);
} }
public static class FileSender<Q, S> { public static class FileSender<Q, S> {
private final Function<byte[], Q> chunkBuilder; private final BiFunction<byte[], Integer, Q> chunkBuilder;
private final InputStream inputStream; private final InputStream inputStream;
private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); private final CompletableFuture<S> resultFuture = new CompletableFuture<>();
...@@ -67,10 +70,15 @@ public class GrpcFileUploadUtils { ...@@ -67,10 +70,15 @@ public class GrpcFileUploadUtils {
private final AtomicBoolean metaDataSent = new AtomicBoolean(false); private final AtomicBoolean metaDataSent = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicBoolean done = new AtomicBoolean(false);
FileSender(Function<byte[], Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, InputStream inputStream) { private final StreamReader streamReader;
FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder,
InputStream inputStream) {
this.chunkBuilder = chunkBuilder; this.chunkBuilder = chunkBuilder;
this.inputStream = IOUtils.buffer(inputStream, CHUNK_SIZE); this.inputStream = IOUtils.buffer(inputStream, CHUNK_SIZE);
this.reqObserverBuilder = reqObserverBuilder; this.reqObserverBuilder = reqObserverBuilder;
this.streamReader = new StreamReader(this.inputStream);
} }
public FileSender<Q, S> withMetaData(@NonNull Q metaData) { public FileSender<Q, S> withMetaData(@NonNull Q metaData) {
...@@ -92,8 +100,9 @@ public class GrpcFileUploadUtils { ...@@ -92,8 +100,9 @@ public class GrpcFileUploadUtils {
sendMetaData(); sendMetaData();
do { do {
LOG.debug("Sending next chunk."); LOG.debug("Sending next chunk.");
long sentSize = sendNextChunk(); // long sentSize =
checkForEndOfStream(sentSize); sendNextChunk();
// checkForEndOfStream(sentSize);
} while (!done.get() && isReady()); } while (!done.get() && isReady());
LOG.debug("Finished or waiting to become ready."); LOG.debug("Finished or waiting to become ready.");
} }
...@@ -119,17 +128,27 @@ public class GrpcFileUploadUtils { ...@@ -119,17 +128,27 @@ public class GrpcFileUploadUtils {
} }
long sendNextChunk() { long sendNextChunk() {
byte[] contentToSend = readFromStream(); byte[] contentToSend = streamReader.getNextData();
if (contentToSend.length > 0) { if (streamReader.getLastReadSize() > 0) {
sendChunk(contentToSend); sendChunk(contentToSend, streamReader.getLastReadSize());
} else {
endTransfer();
} }
return contentToSend.length; return contentToSend.length;
} }
void sendChunk(byte[] content) { private void endTransfer() {
LOG.debug("Sending {} byte Data.", content.length); requestObserver.onCompleted();
var chunk = chunkBuilder.apply(content); done.set(true);
LOG.debug("File Transfer done. Closing stream.");
IOUtils.closeQuietly(inputStream);
streamReader.close();
}
void sendChunk(byte[] content, int length) {
LOG.debug("Sending {} byte Data.", length);
var chunk = chunkBuilder.apply(content, length);
requestObserver.onNext(chunk); requestObserver.onNext(chunk);
} }
...@@ -157,6 +176,35 @@ public class GrpcFileUploadUtils { ...@@ -157,6 +176,35 @@ public class GrpcFileUploadUtils {
IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(inputStream);
requestObserver.onCompleted(); requestObserver.onCompleted();
done.set(true); done.set(true);
} else {
LOG.debug("File Transfer not jet done - need to tranfer another chunk.");
}
}
@RequiredArgsConstructor
private class StreamReader {
private final InputStream inStream;
private final byte[] buffer = new byte[CHUNK_SIZE];
@Getter
private int lastReadSize = 0;
@Getter
private final AtomicBoolean done = new AtomicBoolean(false);
byte[] getNextData() {
readNext();
return buffer;
}
void close() {
IOUtils.closeQuietly(inStream);
}
void readNext() {
try {
lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE);
} catch (IOException e) {
throw new TechnicalException("Error on sending a single chunk", e);
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment