From 4230f9cc2f3c72fcfc801d447dba4d96a43a259b Mon Sep 17 00:00:00 2001 From: OZGCloud <ozgcloud@mgm-tp.com> Date: Fri, 10 Nov 2023 10:45:48 +0100 Subject: [PATCH] fix try: save memory by reusing buffer array --- .../binaryfile/GrpcFileUploadUtils.java | 70 ++++++++++++++++--- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java b/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java index 1666f3f..846b2a0 100644 --- a/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java +++ b/kop-common-lib/src/main/java/de/itvsh/kop/common/binaryfile/GrpcFileUploadUtils.java @@ -29,6 +29,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.commons.io.IOUtils; @@ -37,8 +38,10 @@ import de.itvsh.kop.common.errorhandling.TechnicalException; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import lombok.AccessLevel; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @Log4j2 @@ -50,13 +53,13 @@ public class GrpcFileUploadUtils { /* * Q = Request Type; S = Response Type */ - public static <Q, S> FileSender<Q, S> createSender(Function<byte[], Q> chunkBuilder, InputStream inputStream, + public static <Q, S> FileSender<Q, S> createSender(BiFunction<byte[], Integer, Q> chunkBuilder, InputStream inputStream, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder) { return new FileSender<>(chunkBuilder, reqObserverBuilder, inputStream); } public static class FileSender<Q, S> { - private final Function<byte[], Q> chunkBuilder; + private final BiFunction<byte[], Integer, Q> chunkBuilder; private final InputStream inputStream; private final CompletableFuture<S> resultFuture = new CompletableFuture<>(); @@ -67,10 +70,15 @@ public class GrpcFileUploadUtils { private final AtomicBoolean metaDataSent = new AtomicBoolean(false); private final AtomicBoolean done = new AtomicBoolean(false); - FileSender(Function<byte[], Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, InputStream inputStream) { + private final StreamReader streamReader; + + FileSender(BiFunction<byte[], Integer, Q> chunkBuilder, Function<StreamObserver<S>, CallStreamObserver<Q>> reqObserverBuilder, + InputStream inputStream) { this.chunkBuilder = chunkBuilder; this.inputStream = IOUtils.buffer(inputStream, CHUNK_SIZE); this.reqObserverBuilder = reqObserverBuilder; + + this.streamReader = new StreamReader(this.inputStream); } public FileSender<Q, S> withMetaData(@NonNull Q metaData) { @@ -92,8 +100,9 @@ public class GrpcFileUploadUtils { sendMetaData(); do { LOG.debug("Sending next chunk."); - long sentSize = sendNextChunk(); - checkForEndOfStream(sentSize); +// long sentSize = + sendNextChunk(); +// checkForEndOfStream(sentSize); } while (!done.get() && isReady()); LOG.debug("Finished or waiting to become ready."); } @@ -119,17 +128,27 @@ public class GrpcFileUploadUtils { } long sendNextChunk() { - byte[] contentToSend = readFromStream(); + byte[] contentToSend = streamReader.getNextData(); - if (contentToSend.length > 0) { - sendChunk(contentToSend); + if (streamReader.getLastReadSize() > 0) { + sendChunk(contentToSend, streamReader.getLastReadSize()); + } else { + endTransfer(); } return contentToSend.length; } - void sendChunk(byte[] content) { - LOG.debug("Sending {} byte Data.", content.length); - var chunk = chunkBuilder.apply(content); + private void endTransfer() { + requestObserver.onCompleted(); + done.set(true); + LOG.debug("File Transfer done. Closing stream."); + IOUtils.closeQuietly(inputStream); + streamReader.close(); + } + + void sendChunk(byte[] content, int length) { + LOG.debug("Sending {} byte Data.", length); + var chunk = chunkBuilder.apply(content, length); requestObserver.onNext(chunk); } @@ -157,6 +176,35 @@ public class GrpcFileUploadUtils { IOUtils.closeQuietly(inputStream); requestObserver.onCompleted(); done.set(true); + } else { + LOG.debug("File Transfer not jet done - need to tranfer another chunk."); + } + } + + @RequiredArgsConstructor + private class StreamReader { + private final InputStream inStream; + private final byte[] buffer = new byte[CHUNK_SIZE]; + @Getter + private int lastReadSize = 0; + @Getter + private final AtomicBoolean done = new AtomicBoolean(false); + + byte[] getNextData() { + readNext(); + return buffer; + } + + void close() { + IOUtils.closeQuietly(inStream); + } + + void readNext() { + try { + lastReadSize = inStream.read(buffer, 0, CHUNK_SIZE); + } catch (IOException e) { + throw new TechnicalException("Error on sending a single chunk", e); + } } } } -- GitLab