Skip to content
Snippets Groups Projects
Commit b0458274 authored by OZGCloud's avatar OZGCloud
Browse files

OZG-7143 Remove OnReadyHandler

parent 5325fb2d
No related branches found
No related tags found
No related merge requests found
......@@ -23,15 +23,6 @@
*/
package de.ozgcloud.common.binaryfile;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
......@@ -40,6 +31,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
@Log4j2
public class GrpcBinaryFileServerDownloader<T> {
......@@ -52,7 +54,6 @@ public class GrpcBinaryFileServerDownloader<T> {
private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadInProgress = new AtomicBoolean(false);
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
......@@ -79,7 +80,7 @@ public class GrpcBinaryFileServerDownloader<T> {
void doStart() {
handleSafety(this::setupStreams);
taskExecutor.execute(Context.current().wrap(this::startDownload));
callObserver.setOnReadyHandler(this::onReadyHandler);
sendChunks();
}
void setupStreams() throws IOException {
......@@ -93,32 +94,22 @@ public class GrpcBinaryFileServerDownloader<T> {
}
void doDownload() throws IOException {
downloadInProgress.set(true);
downloadConsumer.accept(outputStream);
downloadInProgress.set(false);
outputStream.close();
}
synchronized void onReadyHandler() {
if (callObserver.isReady()) {
sendChunks();
}
}
void sendChunks() {
handleSafety(this::doSendChunks);
}
void doSendChunks() throws IOException {
int bytesRead;
while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) {
while ((bytesRead = inputStream.read(buffer)) != -1) {
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
}
if (!downloadInProgress.get()) {
inputStream.close();
callObserver.onCompleted();
}
}
void handleSafety(ExceptionalRunnable runnable) {
try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment