Skip to content
Snippets Groups Projects
Commit b1441a74 authored by OZGCloud's avatar OZGCloud
Browse files

OZG-4461 OZG-4511 publish commandProcessedEvent on success

parent b7a66d0d
No related branches found
No related tags found
No related merge requests found
package de.ozgcloud.processor.processor;
import de.itvsh.ozg.pluto.command.Command;
import de.itvsh.ozg.pluto.command.CommandExecutedEvent;
class CommandProcessedEvent extends CommandExecutedEvent {
private static final long serialVersionUID = 1L;
public CommandProcessedEvent(Command command) {
super(command);
}
}
...@@ -62,9 +62,10 @@ class ProcessorEventListener { ...@@ -62,9 +62,10 @@ class ProcessorEventListener {
public void onCommandExecuteProcessor(CommandCreatedEvent event) { public void onCommandExecuteProcessor(CommandCreatedEvent event) {
var vorgang = getVorgang(event.getSource().getVorgangId()); var vorgang = getVorgang(event.getSource().getVorgangId());
processorService.processVorgang(vorgang, getProcessorNames(event.getSource())).forEach(processorResultMono -> processorResultMono processorService.processVorgang(vorgang, getProcessorNames(event.getSource())).forEach(processorResultMono -> processorResultMono
.doOnError(cause -> handleCommandError(cause, event.getSource().getId())).onErrorComplete() .doOnError(cause -> publishCommandFailedEvent(cause, event.getSource().getId())).onErrorComplete()
.map(result -> addVorgangId(result, event.getSource().getVorgangId())) .map(result -> addVorgangId(result, event.getSource().getVorgangId()))
.subscribe(resultService::processResult)); .subscribe(resultService::processResult));
publishCommandProcessedEvent(event.getSource());
} }
private Vorgang getVorgang(String vorgangId) { private Vorgang getVorgang(String vorgangId) {
...@@ -80,7 +81,11 @@ class ProcessorEventListener { ...@@ -80,7 +81,11 @@ class ProcessorEventListener {
return result.toBuilder().vorgangId(VorgangId.from(vorgangId)).build(); return result.toBuilder().vorgangId(VorgangId.from(vorgangId)).build();
} }
private void handleCommandError(Throwable e, String commandId) { private void publishCommandFailedEvent(Throwable e, String commandId) {
publisher.publishEvent(new CommandFailedEvent(commandId, e.getMessage())); publisher.publishEvent(new CommandFailedEvent(commandId, e.getMessage()));
} }
private void publishCommandProcessedEvent(Command command) {
publisher.publishEvent(new CommandProcessedEvent(command));
}
} }
\ No newline at end of file
...@@ -10,6 +10,7 @@ import java.util.Map; ...@@ -10,6 +10,7 @@ import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
...@@ -126,6 +127,8 @@ class ProcessorEventListenerTest { ...@@ -126,6 +127,8 @@ class ProcessorEventListenerTest {
private Throwable exception; private Throwable exception;
@Captor @Captor
private ArgumentCaptor<CommandFailedEvent> commandFailedEventCaptor; private ArgumentCaptor<CommandFailedEvent> commandFailedEventCaptor;
@Captor
private ArgumentCaptor<CommandProcessedEvent> commandSuccessEventCaptor;
private final Vorgang vorgang = VorgangTestFactory.create(); private final Vorgang vorgang = VorgangTestFactory.create();
private final Collection<String> processorNames = List.of("dummy1", "dummy2"); private final Collection<String> processorNames = List.of("dummy1", "dummy2");
...@@ -153,7 +156,7 @@ class ProcessorEventListenerTest { ...@@ -153,7 +156,7 @@ class ProcessorEventListenerTest {
@Test @Test
@SneakyThrows @SneakyThrows
void shouldHandleCommandError() { void shouldPublishCommandFailedEvent() {
var errorMsg = "ErrorMsg"; var errorMsg = "ErrorMsg";
when(exception.getMessage()).thenReturn(errorMsg); when(exception.getMessage()).thenReturn(errorMsg);
when(processorService.processVorgang(any(), any())).thenReturn(Stream.of(Mono.error(exception))); when(processorService.processVorgang(any(), any())).thenReturn(Stream.of(Mono.error(exception)));
...@@ -165,5 +168,18 @@ class ProcessorEventListenerTest { ...@@ -165,5 +168,18 @@ class ProcessorEventListenerTest {
assertThat(commandFailedEventCaptor.getValue().getSource()).isEqualTo(CommandTestFactory.ID); assertThat(commandFailedEventCaptor.getValue().getSource()).isEqualTo(CommandTestFactory.ID);
assertThat(commandFailedEventCaptor.getValue().getErrorMessage()).isEqualTo(errorMsg); assertThat(commandFailedEventCaptor.getValue().getErrorMessage()).isEqualTo(errorMsg);
} }
@Disabled
@Test
@SneakyThrows
void shouldPublishCommandProcessedEvent() {
when(processorService.processVorgang(any(), any())).thenReturn(Stream.empty());
vorgangEventListener.onCommandExecuteProcessor(event);
verify(publisher).publishEvent(commandSuccessEventCaptor.capture());
assertThat(commandFailedEventCaptor.getValue().getClass()).isEqualTo(CommandProcessedEvent.class);
assertThat(commandFailedEventCaptor.getValue().getSource()).isEqualTo(command);
}
} }
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment