Skip to content
Snippets Groups Projects

OZG-7262 OZG-7566 Notify callObserver about error

Merged Krzysztof Witukiewicz requested to merge OZG-7262-downloader-error-handling into main

Files

@@ -23,22 +23,26 @@
*/
package de.ozgcloud.common.binaryfile;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.TaskExecutor;
import com.google.protobuf.ByteString;
import de.ozgcloud.common.errorhandling.TechnicalException;
import io.grpc.stub.CallStreamObserver;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
@Log4j2
public class GrpcBinaryFileServerDownloader<T> {
@@ -49,10 +53,11 @@ public class GrpcBinaryFileServerDownloader<T> {
private final Consumer<OutputStream> downloadConsumer;
private final TaskExecutor taskExecutor;
private final byte[] buffer = new byte[GrpcBinaryFileServerDownloader.CHUNK_SIZE];
private final byte[] buffer = new byte[CHUNK_SIZE];
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean downloadFinished = new AtomicBoolean(false);
private final AtomicBoolean requestFinished = new AtomicBoolean(false);
private final AtomicReference<TechnicalException> downloadError = new AtomicReference<>();
private PipedInputStream inputStream;
private PipedOutputStream outputStream;
@@ -78,19 +83,37 @@ public class GrpcBinaryFileServerDownloader<T> {
void doStart() {
LOG.debug("Starting download.");
handleSafety(this::setupStreams);
safelySetupStreams();
taskExecutor.execute(this::startDownload);
callObserver.setOnReadyHandler(this::sendChunks);
}
void safelySetupStreams() {
try {
setupStreams();
} catch (Exception e) {
closeOutputStream();
closeInputStream();
throw new TechnicalException("Error while setting up streams", e);
}
}
void setupStreams() throws IOException {
outputStream = new PipedOutputStream();
inputStream = new PipedInputStream(GrpcBinaryFileServerDownloader.CHUNK_SIZE);
inputStream = new PipedInputStream(CHUNK_SIZE);
outputStream.connect(inputStream);
}
void startDownload() {
handleSafety(this::doDownload);
try {
doDownload();
sendChunks();
} catch (Exception e) {
downloadError.set(new TechnicalException("Error while downloading file contents", e));
LOG.error(downloadError.get().getMessage(), downloadError.get());
} finally {
closeOutputStream();
}
}
void doDownload() {
@@ -98,12 +121,14 @@ public class GrpcBinaryFileServerDownloader<T> {
downloadConsumer.accept(outputStream);
LOG.debug("Download completed.");
downloadFinished.set(true);
closeOutputStream();
sendChunks();
}
synchronized void sendChunks() {
handleSafety(this::doSendChunks);
try {
doSendChunks();
} catch (Exception e) {
completeRequestWithError(new TechnicalException("Error while sending chunks", e));
}
}
void doSendChunks() throws IOException {
@@ -111,7 +136,7 @@ public class GrpcBinaryFileServerDownloader<T> {
return;
}
int bytesRead;
while (callObserver.isReady()) {
while (isReady()) {
if ((bytesRead = inputStream.read(buffer)) == -1) {
tryCompleteRequest();
break;
@@ -121,30 +146,33 @@ public class GrpcBinaryFileServerDownloader<T> {
}
}
private boolean isReady() {
return callObserver.isReady();
}
void tryCompleteRequest() {
if (shouldCompleteRequest()) {
completeRequest();
if (Objects.nonNull(downloadError.get())) {
throw downloadError.get();
} else if (downloadFinished.get()) {
completeRequestNormally();
}
}
boolean shouldCompleteRequest() {
return downloadFinished.get() && requestFinished.compareAndSet(false, true);
void completeRequestWithError(TechnicalException e) {
LOG.debug("Complete download request with error");
finishRequest();
throw e;
}
void completeRequest() {
void completeRequestNormally() {
LOG.debug("Complete download request");
closeInputStream();
finishRequest();
callObserver.onCompleted();
}
void handleSafety(ExceptionalRunnable runnable) {
try {
runnable.run();
} catch (Exception e) {
closeOutputStream();
closeInputStream();
throw new TechnicalException("Error occurred during downloading file content download.", e);
}
private void finishRequest() {
requestFinished.set(true);
closeInputStream();
}
void closeOutputStream() {
@@ -154,5 +182,4 @@ public class GrpcBinaryFileServerDownloader<T> {
void closeInputStream() {
IOUtils.closeQuietly(inputStream, e -> LOG.error("InputStream cannot be closed.", e));
}
}
\ No newline at end of file
Loading