From b6f50afd1d5edbe2beb1295fce866c0e540eac8f Mon Sep 17 00:00:00 2001 From: Krzysztof <krzysztof.witukiewicz@mgm-tp.com> Date: Thu, 3 Apr 2025 17:12:18 +0200 Subject: [PATCH] OZG-7573 OZG-7991 Do not exit run() when delegate is replaced --- .../vorgang/redirect/EingangForwarder.java | 24 ++++++++-- .../redirect/EingangForwarderTest.java | 48 +++++++++++++++++-- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java index f34f7e0ba..7222987de 100644 --- a/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java +++ b/vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java @@ -29,8 +29,10 @@ import io.grpc.stub.ClientResponseObserver; import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; @RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Log4j2 class EingangForwarder { private final RouteForwardingServiceGrpc.RouteForwardingServiceStub serviceStub; @@ -241,11 +243,11 @@ class EingangForwarder { static class DelegatingOnReadyHandler implements Runnable { private final ClientCallStreamObserver<GrpcRouteForwardingRequest> requestStream; - private final AtomicReference<Runnable> onReadyHandler = new AtomicReference<>(); + private final AtomicReference<Runnable> delegate = new AtomicReference<>(); private final AtomicBoolean done = new AtomicBoolean(false); public void setDelegate(Runnable onReadyHandler) { - this.onReadyHandler.set(onReadyHandler); + this.delegate.set(onReadyHandler); } public void stop() { @@ -254,9 +256,21 @@ class EingangForwarder { @Override public void run() { - var delegate = onReadyHandler.get(); - if (delegate != null && !done.get() && requestStream.isReady()) { - delegate.run(); + while (!done.get() && requestStream.isReady()) { + if (Thread.currentThread().isInterrupted()) { + break; + } + var delegate = this.delegate.get(); + if (delegate != null) { + delegate.run(); + } else { + try { + wait(100); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for delegate to be set"); + Thread.currentThread().interrupt(); + } + } } } } diff --git a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java index 0361376b0..25f551826 100644 --- a/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java +++ b/vorgang-manager-server/src/test/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarderTest.java @@ -1,16 +1,21 @@ package de.ozgcloud.vorgang.vorgang.redirect; import static org.assertj.core.api.Assertions.*; +import static org.awaitility.Awaitility.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.eq; import java.io.InputStream; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -967,6 +972,20 @@ class EingangForwarderTest { assertThat(getDoneFromOnReadyHandler()).isFalse(); } + @Nested + class TestSetDelegate { + + @Mock + private Runnable delegate; + + @Test + void shouldSetDelegate() { + onReadyHandler.setDelegate(delegate); + + assertThat(getDelegateFromOnReadyHandler()).isSameAs(delegate); + } + } + @Nested class TestStop { @@ -1010,17 +1029,40 @@ class EingangForwarderTest { @Test void shouldRunDelegateIfNotDoneAndReady() { - when(requestStream.isReady()).thenReturn(true); + when(requestStream.isReady()).thenReturn(true).thenReturn(false); + runWithOnReadyHandlerInAnotherThread(() -> { + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> verify(delegate, atLeastOnce()).run()); + }); + } - onReadyHandler.run(); + @Test + void shouldContinueAfterDelegateWasReplaced() { + when(requestStream.isReady()).thenReturn(true); + runWithOnReadyHandlerInAnotherThread(() -> { + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> verify(delegate, atLeastOnce()).run()); + var delegate2 = mock(Runnable.class); + onReadyHandler.setDelegate(delegate2); + await().atMost(Duration.ofMillis(500)).untilAsserted(() -> verify(delegate2, atLeastOnce()).run()); + }); + } - verify(delegate).run(); + private void runWithOnReadyHandlerInAnotherThread(Runnable runnable) { + try (ExecutorService executor = Executors.newSingleThreadExecutor()) { + var future = executor.submit(onReadyHandler); + runnable.run(); + future.cancel(true); + executor.shutdown(); + } } } private boolean getDoneFromOnReadyHandler() { return ReflectionTestUtils.getField(onReadyHandler, "done", AtomicBoolean.class).get(); } + + private Runnable getDelegateFromOnReadyHandler() { + return (Runnable) ReflectionTestUtils.getField(onReadyHandler, "delegate", AtomicReference.class).get(); + } } private void setResponseObserverInForwarder(ForwardingResponseObserver responseObserver) { -- GitLab