Skip to content
Snippets Groups Projects
Commit 136822b7 authored by OZGCloud's avatar OZGCloud
Browse files

OZG-7143 add debug logs

parent b0458274
Branches
Tags
No related merge requests found
...@@ -23,6 +23,15 @@ ...@@ -23,6 +23,15 @@
*/ */
package de.ozgcloud.common.binaryfile; package de.ozgcloud.common.binaryfile;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PipedInputStream; import java.io.PipedInputStream;
...@@ -31,17 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; ...@@ -31,17 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.Context;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
public class GrpcBinaryFileServerDownloader<T> { public class GrpcBinaryFileServerDownloader<T> {
...@@ -54,6 +52,7 @@ public class GrpcBinaryFileServerDownloader<T> { ...@@ -54,6 +52,7 @@ public class GrpcBinaryFileServerDownloader<T> {
private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE]; private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadInProgress = new AtomicBoolean(false);
private PipedInputStream inputStream; private PipedInputStream inputStream;
private PipedOutputStream outputStream; private PipedOutputStream outputStream;
...@@ -78,9 +77,10 @@ public class GrpcBinaryFileServerDownloader<T> { ...@@ -78,9 +77,10 @@ public class GrpcBinaryFileServerDownloader<T> {
} }
void doStart() { void doStart() {
LOG.debug("[{}] Starting download.", Thread.currentThread().getName());
handleSafety(this::setupStreams); handleSafety(this::setupStreams);
taskExecutor.execute(Context.current().wrap(this::startDownload)); taskExecutor.execute(Context.current().wrap(this::startDownload));
sendChunks(); callObserver.setOnReadyHandler(this::onReadyHandler);
} }
void setupStreams() throws IOException { void setupStreams() throws IOException {
...@@ -94,22 +94,37 @@ public class GrpcBinaryFileServerDownloader<T> { ...@@ -94,22 +94,37 @@ public class GrpcBinaryFileServerDownloader<T> {
} }
void doDownload() throws IOException { void doDownload() throws IOException {
downloadInProgress.set(true);
LOG.debug("[{}] Downloading file content.", Thread.currentThread().getName());
downloadConsumer.accept(outputStream); downloadConsumer.accept(outputStream);
LOG.debug("[{}] Downloading file content finished.", Thread.currentThread().getName());
downloadInProgress.set(false);
outputStream.close(); outputStream.close();
} }
synchronized void onReadyHandler() {
if (callObserver.isReady()) {
sendChunks();
}
}
void sendChunks() { void sendChunks() {
handleSafety(this::doSendChunks); handleSafety(this::doSendChunks);
} }
void doSendChunks() throws IOException { void doSendChunks() throws IOException {
int bytesRead; int bytesRead = 0;
while ((bytesRead = inputStream.read(buffer)) != -1) { while (callObserver.isReady() && (bytesRead = inputStream.read(buffer)) != -1) {
callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead))); callObserver.onNext(chunkBuilder.apply(ByteString.copyFrom(buffer, 0, bytesRead)));
} }
LOG.debug("[{}] Sending file content finished. isReady: {}, bytesRead: {}", Thread.currentThread().getName(), callObserver.isReady(),
bytesRead);
if (!downloadInProgress.get()) {
inputStream.close(); inputStream.close();
LOG.debug("[{}] Complete request.", Thread.currentThread().getName());
callObserver.onCompleted(); callObserver.onCompleted();
} }
}
void handleSafety(ExceptionalRunnable runnable) { void handleSafety(ExceptionalRunnable runnable) {
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment