Skip to content
Snippets Groups Projects
Commit b7a66d0d authored by OZGCloud's avatar OZGCloud
Browse files

OZG-4461 OZG-4511 impl CommandEventListener

parent c19a5a29
No related branches found
No related tags found
No related merge requests found
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<description>OZG-Cloud Processor Manager</description> <description>OZG-Cloud Processor Manager</description>
<properties> <properties>
<pluto.version>1.14.0</pluto.version> <pluto.version>1.16.0</pluto.version>
<tyrus-standalone-client.version>2.1.3</tyrus-standalone-client.version> <tyrus-standalone-client.version>2.1.3</tyrus-standalone-client.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -72,6 +72,13 @@ ...@@ -72,6 +72,13 @@
<artifactId>mockwebserver</artifactId> <artifactId>mockwebserver</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- commons -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package de.ozgcloud.processor.processor; package de.ozgcloud.processor.processor;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Predicate;
import org.apache.commons.collections.MapUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import de.itvsh.ozg.pluto.command.Command;
import de.itvsh.ozg.pluto.command.CommandCreatedEvent;
import de.itvsh.ozg.pluto.command.CommandFailedEvent;
import de.itvsh.ozg.pluto.command.VorgangCreatedEvent; import de.itvsh.ozg.pluto.command.VorgangCreatedEvent;
import de.ozgcloud.processor.result.ResultService; import de.ozgcloud.processor.result.ResultService;
import de.ozgcloud.processor.vorgang.Vorgang; import de.ozgcloud.processor.vorgang.Vorgang;
...@@ -21,22 +30,57 @@ class ProcessorEventListener { ...@@ -21,22 +30,57 @@ class ProcessorEventListener {
private final VorgangService service; private final VorgangService service;
private final ProcessorService processorService; private final ProcessorService processorService;
private final ResultService resultService; private final ResultService resultService;
private final ApplicationEventPublisher publisher;
static final String EXECUTE_PROCESS_ORDER = "EXECUTE_PROCESSOR";
private static final String IS_EXECUTE_PROCESSOR = "{T(de.ozgcloud.processor.processor.ProcessorEventListener).IS_EXECUTE_PROCESSOR_EVENT.test(event.getSource())}";
public static final Predicate<Command> IS_EXECUTE_PROCESSOR_EVENT = command -> command.getOrder().equals(EXECUTE_PROCESS_ORDER);
static final String COMMAND_PROCESSOR_NAMES_KEY = "processorNames";
@EventListener @EventListener
public void triggerNewVorgangProcessors(VorgangCreatedEvent event) { public void triggerNewVorgangProcessors(VorgangCreatedEvent event) {
try { try {
var vorgang = service.getVorgang(VorgangId.from(event.getSource())); var vorgang = getVorgang(event.getSource());
processorService.processVorgang(vorgang).forEach(processorResultMono -> processorResultMono processorService.processVorgang(vorgang).forEach(processorResultMono -> processorResultMono
.doOnError(cause -> handleError(cause, event.getSource())).onErrorComplete() .doOnError(cause -> handleError(cause, event.getSource())).onErrorComplete()
.map(result -> result.toBuilder().vorgangId(VorgangId.from(event.getSource())).build()) .map(result -> addVorgangId(result, event.getSource()))
.subscribe(resultService::processResult)); .subscribe(resultService::processResult));
} catch (RuntimeException e) { } catch (RuntimeException e) {
handleError(e, event.getSource()); handleError(e, event.getSource());
} }
} }
private void handleError(Throwable cause, String vorgangId) { private void handleError(Throwable cause, String vorgangId) {
LOG.error("Error on procession Vorgang {} externally", vorgangId, cause); LOG.error("Error on procession Vorgang {} externally", vorgangId, cause);
resultService.processError(cause, vorgangId); resultService.processError(cause, vorgangId);
} }
@EventListener(condition = IS_EXECUTE_PROCESSOR)
public void onCommandExecuteProcessor(CommandCreatedEvent event) {
var vorgang = getVorgang(event.getSource().getVorgangId());
processorService.processVorgang(vorgang, getProcessorNames(event.getSource())).forEach(processorResultMono -> processorResultMono
.doOnError(cause -> handleCommandError(cause, event.getSource().getId())).onErrorComplete()
.map(result -> addVorgangId(result, event.getSource().getVorgangId()))
.subscribe(resultService::processResult));
}
private Vorgang getVorgang(String vorgangId) {
return service.getVorgang(VorgangId.from(vorgangId));
}
@SuppressWarnings("unchecked")
private Collection<String> getProcessorNames(Command command) {
return (Collection<String>) MapUtils.getObject(command.getBodyObject(), COMMAND_PROCESSOR_NAMES_KEY, Collections.emptyList());
}
private ProcessorResult addVorgangId(ProcessorResult result, String vorgangId) {
return result.toBuilder().vorgangId(VorgangId.from(vorgangId)).build();
}
private void handleCommandError(Throwable e, String commandId) {
publisher.publishEvent(new CommandFailedEvent(commandId, e.getMessage()));
}
} }
\ No newline at end of file
...@@ -68,4 +68,9 @@ public class ProcessorService { ...@@ -68,4 +68,9 @@ public class ProcessorService {
Mono<Throwable> buildRedirectError(ClientResponse clientResponse) { Mono<Throwable> buildRedirectError(ClientResponse clientResponse) {
return Mono.error(new TechnicalException("Resource was moved (%s) ".formatted(clientResponse.statusCode()))); return Mono.error(new TechnicalException("Resource was moved (%s) ".formatted(clientResponse.statusCode())));
} }
public Stream<Mono<ProcessorResult>> processVorgang(Vorgang vorgang, Collection<String> processorName) {
//TODO Implement OZG-4513
return null;
}
} }
package de.ozgcloud.processor.command;
import de.ozgcloud.processor.vorgang.VorgangTestFactory;
public class CommandTestFactory {
public static final String ID = "42";
public static TestCommand create() {
return createBuilder().build();
}
public static TestCommand.TestCommandBuilder createBuilder() {
return TestCommand.builder()
.id(ID)
.vorgangId(VorgangTestFactory.ID.toString());
}
}
\ No newline at end of file
package de.ozgcloud.processor.command;
import java.time.ZonedDateTime;
import java.util.Map;
import de.itvsh.ozg.pluto.command.CommandStatus;
import lombok.Builder;
import lombok.Getter;
@Getter
@Builder
public class TestCommand implements de.itvsh.ozg.pluto.command.Command {
private String id;
private String vorgangId;
private String relationId;
private Long relationVersion;
private String order;
private ZonedDateTime createdAt;
private ZonedDateTime finishedAt;
private String createdBy;
private String createdByName;
private CommandStatus status;
private Map<String, Object> bodyObject;
private Map<String, String> body;
private String errorMessage;
}
package de.ozgcloud.processor.processor; package de.ozgcloud.processor.processor;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import de.itvsh.kop.common.test.ITCase; import de.itvsh.kop.common.test.ITCase;
import de.itvsh.ozg.pluto.command.CommandCreatedEvent;
import de.itvsh.ozg.pluto.command.VorgangCreatedEvent; import de.itvsh.ozg.pluto.command.VorgangCreatedEvent;
import de.ozgcloud.apilib.vorgang.OzgCloudVorgangTestFactory; import de.ozgcloud.apilib.vorgang.OzgCloudVorgangTestFactory;
import de.ozgcloud.processor.processor.ProcessorService; import de.ozgcloud.processor.command.CommandTestFactory;
import de.ozgcloud.processor.result.ResultService; import de.ozgcloud.processor.result.ResultService;
import de.ozgcloud.processor.vorgang.VorgangId; import de.ozgcloud.processor.vorgang.VorgangId;
import de.ozgcloud.processor.vorgang.VorgangService; import de.ozgcloud.processor.vorgang.VorgangService;
...@@ -19,9 +23,6 @@ import de.ozgcloud.processor.vorgang.VorgangTestFactory; ...@@ -19,9 +23,6 @@ import de.ozgcloud.processor.vorgang.VorgangTestFactory;
@ITCase @ITCase
class ProcessorEventListenerITCase { class ProcessorEventListenerITCase {
@Autowired
private ApplicationEventPublisher publisher;
@MockBean @MockBean
private ResultService resultService; private ResultService resultService;
@MockBean @MockBean
...@@ -29,6 +30,9 @@ class ProcessorEventListenerITCase { ...@@ -29,6 +30,9 @@ class ProcessorEventListenerITCase {
@MockBean @MockBean
private ProcessorService processorService; private ProcessorService processorService;
@Autowired
private ApplicationEventPublisher publisher;
@Test @Test
void shouldLoadVorgang() { void shouldLoadVorgang() {
when(service.getVorgang(any())).thenReturn(VorgangTestFactory.create()); when(service.getVorgang(any())).thenReturn(VorgangTestFactory.create());
...@@ -40,4 +44,21 @@ class ProcessorEventListenerITCase { ...@@ -40,4 +44,21 @@ class ProcessorEventListenerITCase {
} }
@DisplayName("On command listener")
@Nested
class TestOnCommandListener {
@Test
void shouldTriggerForExecuteProcessor() {
publishCommandCreatedEvent(ProcessorEventListener.EXECUTE_PROCESS_ORDER);
verify(service).getVorgang(VorgangId.from(OzgCloudVorgangTestFactory.ID.toString()));
}
}
private void publishCommandCreatedEvent(String order) {
var command = CommandTestFactory.createBuilder().order(order).build();
publisher.publishEvent(new CommandCreatedEvent(command));
}
} }
...@@ -4,30 +4,39 @@ import static org.assertj.core.api.Assertions.*; ...@@ -4,30 +4,39 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*; import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Captor; import org.mockito.Captor;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.springframework.context.ApplicationEventPublisher;
import de.itvsh.ozg.pluto.command.Command;
import de.itvsh.ozg.pluto.command.CommandCreatedEvent;
import de.itvsh.ozg.pluto.command.CommandFailedEvent;
import de.itvsh.ozg.pluto.command.VorgangCreatedEvent; import de.itvsh.ozg.pluto.command.VorgangCreatedEvent;
import de.ozgcloud.apilib.vorgang.OzgCloudVorgangTestFactory; import de.ozgcloud.apilib.vorgang.OzgCloudVorgangTestFactory;
import de.ozgcloud.processor.command.CommandTestFactory;
import de.ozgcloud.processor.result.ProcessorTechnicalException; import de.ozgcloud.processor.result.ProcessorTechnicalException;
import de.ozgcloud.processor.result.ResultService; import de.ozgcloud.processor.result.ResultService;
import de.ozgcloud.processor.vorgang.Vorgang; import de.ozgcloud.processor.vorgang.Vorgang;
import de.ozgcloud.processor.vorgang.VorgangId; import de.ozgcloud.processor.vorgang.VorgangId;
import de.ozgcloud.processor.vorgang.VorgangService; import de.ozgcloud.processor.vorgang.VorgangService;
import de.ozgcloud.processor.vorgang.VorgangTestFactory; import de.ozgcloud.processor.vorgang.VorgangTestFactory;
import lombok.SneakyThrows;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
class ProcessorEventListenerTest { class ProcessorEventListenerTest {
private static final VorgangCreatedEvent EVENT = new VorgangCreatedEvent(OzgCloudVorgangTestFactory.ID.toString());
@Spy @Spy
@InjectMocks @InjectMocks
private ProcessorEventListener vorgangEventListener; private ProcessorEventListener vorgangEventListener;
...@@ -38,12 +47,21 @@ class ProcessorEventListenerTest { ...@@ -38,12 +47,21 @@ class ProcessorEventListenerTest {
private ProcessorService processorService; private ProcessorService processorService;
@Mock @Mock
private ResultService resultService; private ResultService resultService;
@Mock
private ApplicationEventPublisher publisher;
@DisplayName("Trigger new vorgang processor")
@Nested
class TestTriggerNewVorgangProcessor {
@Mock @Mock
private ProcessorTechnicalException exception; private ProcessorTechnicalException exception;
@Captor @Captor
private ArgumentCaptor<ProcessorResult> resultCaptor; private ArgumentCaptor<ProcessorResult> resultCaptor;
private static final VorgangCreatedEvent EVENT = new VorgangCreatedEvent(OzgCloudVorgangTestFactory.ID.toString());
private Vorgang vorgang = VorgangTestFactory.create(); private Vorgang vorgang = VorgangTestFactory.create();
@BeforeEach @BeforeEach
...@@ -80,8 +98,7 @@ class ProcessorEventListenerTest { ...@@ -80,8 +98,7 @@ class ProcessorEventListenerTest {
@Test @Test
void shouldCallConsumeException() { void shouldCallConsumeException() {
var vorgang = VorgangTestFactory.create(); when(vorgangService.getVorgang(any())).thenReturn(VorgangTestFactory.create());
when(vorgangService.getVorgang(any())).thenReturn(vorgang);
when(processorService.processVorgang(any())).thenReturn(Stream.of(Mono.error(exception))); when(processorService.processVorgang(any())).thenReturn(Stream.of(Mono.error(exception)));
vorgangEventListener.triggerNewVorgangProcessors(EVENT); vorgangEventListener.triggerNewVorgangProcessors(EVENT);
...@@ -100,3 +117,53 @@ class ProcessorEventListenerTest { ...@@ -100,3 +117,53 @@ class ProcessorEventListenerTest {
verify(resultService).processError(any(Exception.class), eq(VorgangTestFactory.ID.toString())); verify(resultService).processError(any(Exception.class), eq(VorgangTestFactory.ID.toString()));
} }
} }
@DisplayName("On command executeProcessor")
@Nested
class TestOnCommandExecuteProcessor{
@Mock
private Throwable exception;
@Captor
private ArgumentCaptor<CommandFailedEvent> commandFailedEventCaptor;
private final Vorgang vorgang = VorgangTestFactory.create();
private final Collection<String> processorNames = List.of("dummy1", "dummy2");
private final Command command = CommandTestFactory.createBuilder().bodyObject(Map.of(ProcessorEventListener.COMMAND_PROCESSOR_NAMES_KEY, processorNames)).build();
private final CommandCreatedEvent event = new CommandCreatedEvent(command);
@BeforeEach
void init() {
when(vorgangService.getVorgang(any())).thenReturn(vorgang);
}
@Test
void shouldGetVorgang() {
vorgangEventListener.onCommandExecuteProcessor(event);
verify(vorgangService).getVorgang(VorgangTestFactory.ID);
}
@Test
void shouldCallService() {
vorgangEventListener.onCommandExecuteProcessor(event);
verify(processorService).processVorgang(vorgang, processorNames);
}
@Test
@SneakyThrows
void shouldHandleCommandError() {
var errorMsg = "ErrorMsg";
when(exception.getMessage()).thenReturn(errorMsg);
when(processorService.processVorgang(any(), any())).thenReturn(Stream.of(Mono.error(exception)));
vorgangEventListener.onCommandExecuteProcessor(event);
verify(publisher).publishEvent(commandFailedEventCaptor.capture());
assertThat(commandFailedEventCaptor.getValue().getClass()).isEqualTo(CommandFailedEvent.class);
assertThat(commandFailedEventCaptor.getValue().getSource()).isEqualTo(CommandTestFactory.ID);
assertThat(commandFailedEventCaptor.getValue().getErrorMessage()).isEqualTo(errorMsg);
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment