Skip to content
Snippets Groups Projects
Commit d60bfd10 authored by OZGCloud's avatar OZGCloud
Browse files

Merge pull request 'OZG-4461-ManuellerButtonZumSendenAnProzessor' (#6) from...

Merge pull request 'OZG-4461-ManuellerButtonZumSendenAnProzessor' (#6) from OZG-4461-ManuellerButtonZumSendenAnProzessor into master

Reviewed-on: https://git.ozg-sh.de/mgm/ozgcloud-processor-manager/pulls/6
parents c19a5a29 e2dc6a9d
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
<description>OZG-Cloud Processor Manager</description>
<properties>
<pluto.version>1.14.0</pluto.version>
<pluto.version>1.16.0</pluto.version>
<tyrus-standalone-client.version>2.1.3</tyrus-standalone-client.version>
</properties>
<dependencies>
......@@ -27,6 +27,13 @@
<artifactId>pluto-utils</artifactId>
<version>${pluto.version}</version>
</dependency>
<dependency>
<groupId>de.itvsh.ozg.pluto</groupId>
<artifactId>pluto-command</artifactId>
<version>${pluto.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.itvsh.ozg.pluto</groupId>
<artifactId>pluto-command</artifactId>
......@@ -72,6 +79,13 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<!-- commons -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
......
package de.ozgcloud.processor.processor;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Predicate;
import org.apache.commons.collections.MapUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import de.itvsh.ozg.pluto.command.Command;
import de.itvsh.ozg.pluto.command.CommandCreatedEvent;
import de.itvsh.ozg.pluto.command.CommandFailedEvent;
import de.itvsh.ozg.pluto.command.VorgangCreatedEvent;
import de.ozgcloud.processor.result.ResultService;
import de.ozgcloud.processor.vorgang.Vorgang;
......@@ -11,6 +20,7 @@ import de.ozgcloud.processor.vorgang.VorgangId;
import de.ozgcloud.processor.vorgang.VorgangService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Mono;
@Component("processorVorgangEventListener")
@ConditionalOnProperty(prefix = "ozgcloud.processors[0]", name = "address")
......@@ -21,22 +31,68 @@ class ProcessorEventListener {
private final VorgangService service;
private final ProcessorService processorService;
private final ResultService resultService;
private final ApplicationEventPublisher publisher;
static final String PROCESS_VORGANG_ORDER = "PROCESS_VORGANG";
private static final String IS_PROCESS_VORGANG = "{T(de.ozgcloud.processor.processor.ProcessorEventListener).IS_PROCESS_VORGANG_EVENT.test(event.getSource())}";
public static final Predicate<Command> IS_PROCESS_VORGANG_EVENT = command -> command.getOrder().equals(PROCESS_VORGANG_ORDER);
static final String COMMAND_PROCESSOR_NAMES_KEY = "processorNames";
@EventListener
public void triggerNewVorgangProcessors(VorgangCreatedEvent event) {
try {
var vorgang = service.getVorgang(VorgangId.from(event.getSource()));
var vorgang = getVorgang(event.getSource());
processorService.processVorgang(vorgang).forEach(processorResultMono -> processorResultMono
.doOnError(cause -> handleError(cause, event.getSource())).onErrorComplete()
.map(result -> result.toBuilder().vorgangId(VorgangId.from(event.getSource())).build())
.map(result -> addVorgangId(result, event.getSource()))
.subscribe(resultService::processResult));
} catch (RuntimeException e) {
handleError(e, event.getSource());
}
}
@EventListener(condition = IS_PROCESS_VORGANG)
public void onCommandProcessVorgang(CommandCreatedEvent event) {
try {
var vorgang = getVorgang(event.getSource().getVorgangId());
processorService.processVorgang(vorgang, getProcessorNames(event.getSource()))
.forEach(processorResultMono -> processResult(processorResultMono, event.getSource().getVorgangId()));
publishCommandProcessedEvent(event.getSource());
} catch(Exception e) {
handleError(e, event.getSource().getVorgangId());
publishCommandFailedEvent(e.getCause(), event.getSource().getId());
}
}
void processResult(Mono<ProcessorResult> processorResultMono, String vorgangId) {
var blockedResult = processorResultMono.map(result -> addVorgangId(result, vorgangId)).block();
resultService.processResult(blockedResult);
}
private Vorgang getVorgang(String vorgangId) {
return service.getVorgang(VorgangId.from(vorgangId));
}
@SuppressWarnings("unchecked")
private Collection<String> getProcessorNames(Command command) {
return (Collection<String>) MapUtils.getObject(command.getBodyObject(), COMMAND_PROCESSOR_NAMES_KEY, Collections.emptyList());
}
private ProcessorResult addVorgangId(ProcessorResult result, String vorgangId) {
return result.toBuilder().vorgangId(VorgangId.from(vorgangId)).build();
}
private void handleError(Throwable cause, String vorgangId) {
LOG.error("Error on procession Vorgang {} externally", vorgangId, cause);
resultService.processError(cause, vorgangId);
}
void publishCommandProcessedEvent(Command command) {
publisher.publishEvent(new VorgangProcessedEvent(command));
}
void publishCommandFailedEvent(Throwable e, String commandId) {
publisher.publishEvent(new CommandFailedEvent(commandId, e.getMessage()));
}
}
\ No newline at end of file
......@@ -46,6 +46,14 @@ public class ProcessorService {
.anyMatch(form -> StringUtils.equals(form.getFormId(), vorgang.getFormId()));
}
public Stream<Mono<ProcessorResult>> processVorgang(Vorgang vorgang, Collection<String> processorNames) {
return getProcessors(processorNames).map(processor -> callProcessor(processor, vorgang));
}
Stream<Processor> getProcessors(Collection<String> processorNames) {
return properties.getProcessors().stream().filter(processor -> processorNames.contains(processor.getName()));
}
Mono<ProcessorResult> callProcessor(Processor processor, Vorgang vorgang) {
LOG.info("Sending Vorgang {} to processors.", vorgang.getId());
return webClient.post()
......
package de.ozgcloud.processor.processor;
import de.itvsh.ozg.pluto.command.Command;
import de.itvsh.ozg.pluto.command.CommandExecutedEvent;
class VorgangProcessedEvent extends CommandExecutedEvent {
private static final long serialVersionUID = 1L;
public VorgangProcessedEvent(Command command) {
super(command);
}
}
package de.ozgcloud.processor.processor;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.ApplicationEventPublisher;
import de.itvsh.kop.common.test.ITCase;
import de.itvsh.ozg.pluto.command.CommandCreatedEvent;
import de.itvsh.ozg.pluto.command.CommandTestFactory;
import de.itvsh.ozg.pluto.command.VorgangCreatedEvent;
import de.ozgcloud.apilib.vorgang.OzgCloudVorgangTestFactory;
import de.ozgcloud.processor.processor.ProcessorService;
import de.ozgcloud.processor.result.ResultService;
import de.ozgcloud.processor.vorgang.VorgangId;
import de.ozgcloud.processor.vorgang.VorgangService;
......@@ -19,9 +23,6 @@ import de.ozgcloud.processor.vorgang.VorgangTestFactory;
@ITCase
class ProcessorEventListenerITCase {
@Autowired
private ApplicationEventPublisher publisher;
@MockBean
private ResultService resultService;
@MockBean
......@@ -29,6 +30,9 @@ class ProcessorEventListenerITCase {
@MockBean
private ProcessorService processorService;
@Autowired
private ApplicationEventPublisher publisher;
@Test
void shouldLoadVorgang() {
when(service.getVorgang(any())).thenReturn(VorgangTestFactory.create());
......@@ -40,4 +44,17 @@ class ProcessorEventListenerITCase {
}
@DisplayName("On command listener")
@Nested
class TestOnCommandListener {
@Test
void shouldTriggerForExecuteProcessor() {
var command = CommandTestFactory.createBuilder().order(ProcessorEventListener.PROCESS_VORGANG_ORDER).build();
publisher.publishEvent(new CommandCreatedEvent(command));
verify(service).getVorgang(VorgangId.from(CommandTestFactory.VORGANG_ID));
}
}
}
......@@ -4,16 +4,27 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.springframework.context.ApplicationEventPublisher;
import de.itvsh.ozg.pluto.command.Command;
import de.itvsh.ozg.pluto.command.CommandCreatedEvent;
import de.itvsh.ozg.pluto.command.CommandFailedEvent;
import de.itvsh.ozg.pluto.command.CommandTestFactory;
import de.itvsh.ozg.pluto.command.VorgangCreatedEvent;
import de.ozgcloud.apilib.vorgang.OzgCloudVorgangTestFactory;
import de.ozgcloud.processor.result.ProcessorTechnicalException;
......@@ -22,12 +33,11 @@ import de.ozgcloud.processor.vorgang.Vorgang;
import de.ozgcloud.processor.vorgang.VorgangId;
import de.ozgcloud.processor.vorgang.VorgangService;
import de.ozgcloud.processor.vorgang.VorgangTestFactory;
import jakarta.validation.ConstraintViolationException;
import reactor.core.publisher.Mono;
class ProcessorEventListenerTest {
private static final VorgangCreatedEvent EVENT = new VorgangCreatedEvent(OzgCloudVorgangTestFactory.ID.toString());
@Spy
@InjectMocks
private ProcessorEventListener vorgangEventListener;
......@@ -38,12 +48,21 @@ class ProcessorEventListenerTest {
private ProcessorService processorService;
@Mock
private ResultService resultService;
@Mock
private ApplicationEventPublisher publisher;
@DisplayName("Trigger new vorgang processor")
@Nested
class TestTriggerNewVorgangProcessor {
@Mock
private ProcessorTechnicalException exception;
@Captor
private ArgumentCaptor<ProcessorResult> resultCaptor;
private static final VorgangCreatedEvent EVENT = new VorgangCreatedEvent(OzgCloudVorgangTestFactory.ID.toString());
private Vorgang vorgang = VorgangTestFactory.create();
@BeforeEach
......@@ -80,8 +99,7 @@ class ProcessorEventListenerTest {
@Test
void shouldCallConsumeException() {
var vorgang = VorgangTestFactory.create();
when(vorgangService.getVorgang(any())).thenReturn(vorgang);
when(vorgangService.getVorgang(any())).thenReturn(VorgangTestFactory.create());
when(processorService.processVorgang(any())).thenReturn(Stream.of(Mono.error(exception)));
vorgangEventListener.triggerNewVorgangProcessors(EVENT);
......@@ -91,8 +109,7 @@ class ProcessorEventListenerTest {
@Test
void shouldCallConsumeUnexpectedExceptionType() {
var vorgang = VorgangTestFactory.create();
when(vorgangService.getVorgang(any())).thenReturn(vorgang);
when(vorgangService.getVorgang(any())).thenReturn(VorgangTestFactory.create());
when(processorService.processVorgang(any())).thenReturn(Stream.of(Mono.error(new Exception())));
vorgangEventListener.triggerNewVorgangProcessors(EVENT);
......@@ -100,3 +117,188 @@ class ProcessorEventListenerTest {
verify(resultService).processError(any(Exception.class), eq(VorgangTestFactory.ID.toString()));
}
}
@DisplayName("On command processVorgang")
@Nested
class TestOnCommandProcessVorgang {
@Mock
private Throwable exception;
@Captor
private ArgumentCaptor<CommandFailedEvent> commandFailedEventCaptor;
@Captor
private ArgumentCaptor<VorgangProcessedEvent> commandSuccessEventCaptor;
private final Vorgang vorgang = VorgangTestFactory.create();
private final Collection<String> processorNames = List.of("dummy1", "dummy2");
private final Command command = CommandTestFactory.createBuilder().bodyObject(Map.of(ProcessorEventListener.COMMAND_PROCESSOR_NAMES_KEY, processorNames)).build();
private final CommandCreatedEvent event = new CommandCreatedEvent(command);
@DisplayName("process flow")
@Nested
class TestProcessFlow {
private final Mono<ProcessorResult> processorResults = Mono.just(ProcessorResultTestFactory.create());
private final Stream<Mono<ProcessorResult>> monoStream = Stream.of(processorResults);
@BeforeEach
void init() {
when(vorgangService.getVorgang(any())).thenReturn(vorgang);
when(processorService.processVorgang(any(), any())).thenReturn(monoStream);
}
@Test
void shouldGetVorgang() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(vorgangService).getVorgang(VorgangId.from(CommandTestFactory.VORGANG_ID));
}
@Test
void shouldCallService() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(processorService).processVorgang(vorgang, processorNames);
}
@Test
void shouldProcessResult() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(vorgangEventListener).processResult(any(), eq(CommandTestFactory.VORGANG_ID));
}
@Test
void shouldPublishCommandProcessedEvent() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(vorgangEventListener).publishCommandProcessedEvent(event.getSource());
}
}
@DisplayName("publish command processed event")
@Nested
class TestPublishCommandProcessedEvent {
@BeforeEach
void init(){
when(processorService.processVorgang(any(), any())).thenReturn(Stream.empty());
}
@Test
void shouldPublishEventBeClassOf() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(publisher).publishEvent(commandSuccessEventCaptor.capture());
assertThat(commandSuccessEventCaptor.getValue().getClass()).isEqualTo(VorgangProcessedEvent.class);
}
@Test
void shouldPublishEventHasCommanIdAsSource() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(publisher).publishEvent(commandSuccessEventCaptor.capture());
assertThat(commandSuccessEventCaptor.getValue().getSource()).isEqualTo(CommandTestFactory.ID);
}
@Test
void shouldPublishEventHasCommand() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(publisher).publishEvent(commandSuccessEventCaptor.capture());
assertThat(commandSuccessEventCaptor.getValue().getCommand()).isEqualTo(command);
}
}
@DisplayName("On ViolationConstraint exception")
@Nested
class TestOnViolationConstraintException {
private final Mono<ProcessorResult> processorResults = Mono.just(ProcessorResultTestFactory.create());
private final Stream<Mono<ProcessorResult>> monoStream = Stream.of(processorResults);
private final ConstraintViolationException exception = new ConstraintViolationException("", Collections.emptySet());
@BeforeEach
void init(){
when(vorgangService.getVorgang(any())).thenReturn(vorgang);
when(processorService.processVorgang(any(), any())).thenReturn(monoStream);
doThrow(exception).when(vorgangEventListener).processResult(any(), any());
doNothing().when(vorgangEventListener).publishCommandFailedEvent(any(), any());
}
@Test
void shouldProcessError() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(resultService).processError(any(), eq(CommandTestFactory.VORGANG_ID));
}
@Test
void shouldPublishEventOnConstraintException() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(vorgangEventListener).publishCommandFailedEvent(any(), any());
}
}
@DisplayName("on process vorgang exception")
@Nested
class TestOnProcessVorgangException {
@BeforeEach
void init(){
when(processorService.processVorgang(any(), any())).thenReturn(Stream.of(Mono.error(exception)));
}
@Test
void shouldProcessError() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(vorgangEventListener).publishCommandFailedEvent(any(), any());
}
@Test
void shouldPublishCommandFailedEvent() {
vorgangEventListener.onCommandProcessVorgang(event);
verify(resultService).processError(any(), eq(CommandTestFactory.VORGANG_ID));
}
}
@DisplayName("publish command failed event")
@Nested
class TestPublishCommandFailedEvent {
private final static String ERROR_MSG = "ErrorMsg";
@BeforeEach
void init(){
when(exception.getMessage()).thenReturn(ERROR_MSG);
}
@Test
void shouldPublishEventBeClassOf() {
vorgangEventListener.publishCommandFailedEvent(exception, CommandTestFactory.ID);
verify(publisher).publishEvent(commandFailedEventCaptor.capture());
assertThat(commandFailedEventCaptor.getValue().getClass()).isEqualTo(CommandFailedEvent.class);
}
@Test
void shouldPublishEventHasCommanIdAsSource() {
vorgangEventListener.publishCommandFailedEvent(exception, CommandTestFactory.ID);
verify(publisher).publishEvent(commandFailedEventCaptor.capture());
assertThat(commandFailedEventCaptor.getValue().getSource()).isEqualTo(CommandTestFactory.ID);
}
@Test
void shouldPublishEventHasErrorMessage() {
vorgangEventListener.publishCommandFailedEvent(exception, CommandTestFactory.ID);
verify(publisher).publishEvent(commandFailedEventCaptor.capture());
assertThat(commandFailedEventCaptor.getValue().getErrorMessage()).isEqualTo(ERROR_MSG);
}
}
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
......@@ -54,10 +55,13 @@ class ProcessorServiceITCase {
@BeforeEach
void setup() {
processor = ProcessorTestFactory.createBuilder().address(mockWebServer.url("/").toString()).build();
MockResponse response = new MockResponse()
mockWebServer.enqueue(createMockResponse());
}
private MockResponse createMockResponse() {
return new MockResponse()
.addHeader("Content-Type", "application/json")
.setBody("{\"%s\": \"%s\"}".formatted(ProcessorResultTestFactory.KEY_RESULT_ACTION, ProcessorResultTestFactory.RESULT_ACTION));
mockWebServer.enqueue(response);
}
@AfterEach
......@@ -203,6 +207,40 @@ class ProcessorServiceITCase {
assertThrows(ProcessorTechnicalException.class, () -> results.forEach(Mono::block));
}
}
@DisplayName("Process vorgang")
@Nested
class TestProcessVorgang {
private MockWebServer mockWebServer = new MockWebServer();
private Vorgang vorgang = VorgangTestFactory.create();
private Processor processor = ProcessorTestFactory.createBuilder().address(mockWebServer.url("/").toString()).build();
private ProcessorProperties properties = ProcessorProperties.builder().processor(processor).build();
@BeforeEach
void setup() {
ReflectionTestUtils.setField(service, "properties", properties);
}
@SneakyThrows
@AfterEach
void cleanup() {
mockWebServer.shutdown();
}
@Test
void shouldReturnProcessor() {
var result = service.processVorgang(vorgang, Collections.singleton(ProcessorTestFactory.PROCESSOR_NAME));
assertThat(result).isNotEmpty();
}
@Test
void shouldReturnEmptyStream() {
var result = service.processVorgang(vorgang, Collections.singleton("notMatchingProcessorName"));
assertThat(result).isEmpty();
}
}
}
\ No newline at end of file
package de.ozgcloud.processor.processor;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
......@@ -14,11 +19,12 @@ import org.mockito.Mockito;
import org.mockito.Spy;
import de.ozgcloud.processor.processor.ProcessorProperties.Processor;
import de.ozgcloud.processor.vorgang.Vorgang;
import de.ozgcloud.processor.vorgang.VorgangTestFactory;
import reactor.core.publisher.Mono;
class ProcessorServiceTest {
@Spy
@InjectMocks
private ProcessorService service;
......@@ -28,7 +34,6 @@ class ProcessorServiceTest {
@Mock
private ProcessorConfiguration processorConfiguration;
@Nested
class TestProcessVorgang {
......@@ -71,4 +76,78 @@ class ProcessorServiceTest {
}
}
@DisplayName("Process vorgang by given processorNames")
@Nested
class TestProcessVorgangByGivenProcessorNames {
private final Vorgang vorgang = VorgangTestFactory.create();
private final Collection<String> processorNames = Collections.singleton(ProcessorTestFactory.PROCESSOR_NAME);
private final Processor processor = ProcessorTestFactory.create();
private final Mono<ProcessorResult> processorMonoResponse = Mono.just(ProcessorResultTestFactory.create());
@Test
void shouldGetProcessorByNames() {
doReturn(Stream.of(processor)).when(service).getProcessors(anyCollection());
service.processVorgang(vorgang, processorNames);
verify(service).getProcessors(processorNames);
}
@Test
void shouldCallProcessor() {
doReturn(processorMonoResponse).when(service).callProcessor(any(), any());
doReturn(Stream.of(processor)).when(service).getProcessors(anyCollection());
service.processVorgang(vorgang, processorNames).toList();
verify(service).callProcessor(any(), any());
}
@Test
void shouldReturnProcessorResult() {
doReturn(processorMonoResponse).when(service).callProcessor(any(), any());
doReturn(Stream.of(processor)).when(service).getProcessors(anyCollection());
var result = service.processVorgang(vorgang, processorNames).toList();
assertThat(result).containsExactly(processorMonoResponse);
}
@DisplayName("get processors")
@Nested
class TestGetProcessors {
private final Processor processor = ProcessorTestFactory.create();
@Test
void shouldGetConfiguredProcessor() {
when(properties.getProcessors()).thenReturn(List.of(processor));
service.getProcessors(processorNames);
verify(properties).getProcessors();
}
@DisplayName("should return processors matching processor name")
@Test
void shouldReturnProcessor() {
when(properties.getProcessors()).thenReturn(List.of(processor));
var processorResult = service.getProcessors(processorNames).toList();
assertThat(processorResult).containsExactly(processor);
}
@DisplayName("should return empty stream on non matching processor name")
@Test
void shouldReturnEmpty() {
when(properties.getProcessors()).thenReturn(List.of(ProcessorTestFactory.createBuilder().name("notMatchin").build()));
var processorResult = service.getProcessors(processorNames).toList();
assertThat(processorResult).isEmpty();
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment