Skip to content
Snippets Groups Projects
Commit 4c263c57 authored by Krzysztof Witukiewicz's avatar Krzysztof Witukiewicz
Browse files

OZG-7573 OZG-7991 CR adjustments

parent 3269b04a
Branches
Tags
1 merge request!27OZG-7573 Dateien Weiterleiten
......@@ -80,11 +80,15 @@ class EingangForwarder {
public void forward(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) {
var future = performGrpcCall();
sendEingang(grpcRouteForwarding, attachments, representations);
requestObserver.onCompleted();
waitForCompletion(future);
}
private void sendEingang(GrpcRouteForwarding grpcRouteForwarding, List<IncomingFileGroup> attachments, List<IncomingFile> representations) {
sendRouteForwarding(grpcRouteForwarding);
sendAttachments(attachments);
sendRepresentations(representations);
requestObserver.onCompleted();
waitForCompletion(future);
}
Future<GrpcRouteForwardingResponse> performGrpcCall() {
......@@ -268,11 +272,11 @@ class EingangForwarder {
static class DelegatingOnReadyHandler implements Runnable {
private final ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream;
private final AtomicReference<Runnable> delegate = new AtomicReference<>();
private final AtomicReference<Runnable> delegateRef = new AtomicReference<>();
private final AtomicBoolean done = new AtomicBoolean(false);
public void setDelegate(Runnable onReadyHandler) {
this.delegate.set(onReadyHandler);
this.delegateRef.set(onReadyHandler);
}
public void stop() {
......@@ -285,7 +289,7 @@ class EingangForwarder {
if (Thread.currentThread().isInterrupted()) {
break;
}
var delegate = this.delegate.get();
var delegate = delegateRef.get();
if (delegate != null) {
delegate.run();
}
......
......@@ -183,6 +183,14 @@ class EingangForwarderTest {
verify(serviceStub).routeForwarding(getResponseObserverFromForwarder());
}
@Test
void shouldReturnFutureOfResponseObserver() {
var result = forwarder.performGrpcCall();
var expectedFuture = ReflectionTestUtils.getField(getResponseObserverFromForwarder(), "future", CompletableFuture.class);
assertThat(result).isSameAs(expectedFuture);
}
}
@Nested
......@@ -1194,7 +1202,7 @@ class EingangForwarderTest {
}
private Runnable getDelegateFromOnReadyHandler() {
return (Runnable) ReflectionTestUtils.getField(onReadyHandler, "delegate", AtomicReference.class).get();
return (Runnable) ReflectionTestUtils.getField(onReadyHandler, "delegateRef", AtomicReference.class).get();
}
}
......@@ -1209,8 +1217,4 @@ class EingangForwarderTest {
private void setRequestObserverInForwarder(ClientCallStreamObserver<GrpcRouteForwardingRequest> requestObserver) {
ReflectionTestUtils.setField(forwarder, "requestObserver", requestObserver);
}
private void setForwardFutureInForwarder(CompletableFuture<Void> future) {
ReflectionTestUtils.setField(forwarder, "forwardFuture", future);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment