package de.ozgcloud.xta.client; import static java.util.Collections.*; import java.util.List; import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Stream; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import de.ozgcloud.xta.client.config.XtaClientConfig; import de.ozgcloud.xta.client.core.XtaClientService; import de.ozgcloud.xta.client.core.XtaExceptionHandler; import de.ozgcloud.xta.client.exception.XtaClientRuntimeException; import de.ozgcloud.xta.client.exception.XtaClientException; import de.ozgcloud.xta.client.exception.XtaClientInitializationException; import de.ozgcloud.xta.client.model.XtaIdentifier; import de.ozgcloud.xta.client.model.XtaMessage; import de.ozgcloud.xta.client.model.XtaMessageMetaData; import de.ozgcloud.xta.client.model.XtaMessageMetaDataListing; import de.ozgcloud.xta.client.model.XtaMessageStatus; import de.ozgcloud.xta.client.model.XtaTransportReport; import lombok.AccessLevel; import lombok.Builder; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @RequiredArgsConstructor(access = AccessLevel.MODULE) @Builder(access = AccessLevel.MODULE) @Log4j2 public class XtaClient { private final XtaClientService service; private final XtaClientConfig config; private final XtaExceptionHandler exceptionHandler; static final String NO_MESSAGE_CLOSED_WARNING = "No message has been closed although more are pending. Try increasing max list items."; public static XtaClient from(XtaClientConfig config) throws XtaClientInitializationException { return XtaClientFactory.from(config).create(); } public List<XtaTransportReport> fetchMessages(@NotNull Consumer<XtaMessage> processMessage) throws XtaClientException { try { return fetchMessagesRaw(processMessage); } catch (RuntimeException exception) { throw exceptionHandler.deriveXtaClientException(exception); } } List<XtaTransportReport> fetchMessagesRaw(Consumer<XtaMessage> processMessage) { return config.getClientIdentifiers().stream() .filter(service::checkAccountActive) .map(clientIdentifier -> initializeFetchMessageParameter(clientIdentifier, processMessage)) .flatMap(this::fetchMessagesForClientIdentifier) .toList(); } FetchMessageParameter initializeFetchMessageParameter(XtaIdentifier clientIdentifier, Consumer<XtaMessage> processMessage) { return new FetchMessageParameter(clientIdentifier, processMessage, emptySet()); } Stream<XtaTransportReport> fetchMessagesForClientIdentifier(FetchMessageParameter parameter) { return service.getStatusList(parameter.clientIdentifier()) .map(listing -> { var transportReports = fetchMessagesForListing(listing.messages(), parameter); return Stream.concat(transportReports.stream(), checkMoreMessagesAvailable(listing, transportReports) ? fetchMessagesForClientIdentifier(parameter.withViewedMessageIdsFrom(listing.messages())) : Stream.empty()); }) .orElse(Stream.empty()); } boolean checkMoreMessagesAvailable(XtaMessageMetaDataListing listing, List<XtaTransportReport> transportReports) { return checkExtraPendingMessagesAvailable(listing) && checkSomeMessageHasBeenClosed(transportReports); } boolean checkExtraPendingMessagesAvailable(XtaMessageMetaDataListing listing) { return listing.messages().size() < listing.pendingMessageCount().intValue(); } boolean checkSomeMessageHasBeenClosed(List<XtaTransportReport> transportReports) { var someTransportReportHasClosedStatus = transportReports.stream() .anyMatch(t -> !t.status().equals(XtaMessageStatus.OPEN)); if (!someTransportReportHasClosedStatus) { logWarnForNoMessageClosed(); } return someTransportReportHasClosedStatus; } void logWarnForNoMessageClosed() { log.warn(NO_MESSAGE_CLOSED_WARNING); } List<XtaTransportReport> fetchMessagesForListing( List<XtaMessageMetaData> messageMetaDataItems, FetchMessageParameter parameter ) { return messageMetaDataItems.stream() .filter(metaData -> checkMessageShouldBeFetched(metaData, parameter)) .map(metaData -> fetchMessage(metaData, parameter)) .flatMap(Optional::stream) .toList(); } boolean checkMessageShouldBeFetched(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) { return !parameter.hasMessageAlreadyBeenViewed(messageMetaData) && isMessageSupported(messageMetaData); } boolean isMessageSupported(XtaMessageMetaData messageMetaData) { return Optional.ofNullable(config.getIsMessageSupported()) .map(predicate -> predicate.test(messageMetaData)) .orElse(true); } Optional<XtaTransportReport> fetchMessage(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) { return service.getMessage(messageMetaData) .flatMap(message -> processMesssageAndFetchTransportReport(message, parameter)); } Optional<XtaTransportReport> processMesssageAndFetchTransportReport(XtaMessage message, FetchMessageParameter parameter) { processMessageAndCloseIfNoException(message, parameter.processMessage()); return service.getTransportReport(message.metaData()); } void processMessageAndCloseIfNoException(XtaMessage message, Consumer<XtaMessage> processMessage) { var messageId = message.metaData().messageId(); try { processMessage.accept(message); log.debug("Processing of message '{}' succeeded! Closing message.", messageId); service.closeMessage(message); } catch (RuntimeException exception) { logErrorForMessageProcessingFailure(messageId, exception); } } void logErrorForMessageProcessingFailure(String messageId, RuntimeException exception) { log.error("Processing of message '%s' failed! Not closing message.".formatted(messageId), exception); } public XtaTransportReport sendMessage(@Valid XtaMessage message) throws XtaClientException { try { return sendMessageRaw(message); } catch (RuntimeException exception) { throw exceptionHandler.deriveXtaClientException(exception); } } XtaTransportReport sendMessageRaw(@Valid XtaMessage messageWithoutMessageId) { var metaData = messageWithoutMessageId.metaData(); throwExceptionIfAccountInactive(metaData.authorIdentifier()); throwExceptionIfServiceNotAvailable(metaData); return service.sendMessage(messageWithoutMessageId); } void throwExceptionIfServiceNotAvailable(XtaMessageMetaData metaData) { if (!service.lookupService(metaData)) { throw new XtaClientRuntimeException("Service %s is not available!".formatted(metaData.service())); } } void throwExceptionIfAccountInactive(XtaIdentifier clientIdentifier) { if (!service.checkAccountActive(clientIdentifier)) { throw new XtaClientRuntimeException("Account %s is not active!".formatted(clientIdentifier.value())); } } }