package de.ozgcloud.xta.client; import static java.util.Collections.*; import java.util.List; import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Stream; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import de.ozgcloud.xta.client.config.XtaClientConfig; import de.ozgcloud.xta.client.core.XtaClientService; import de.ozgcloud.xta.client.core.XtaExceptionHandler; import de.ozgcloud.xta.client.exception.ClientRuntimeException; import de.ozgcloud.xta.client.exception.XtaClientException; import de.ozgcloud.xta.client.exception.XtaClientInitializationException; import de.ozgcloud.xta.client.model.XtaIdentifier; import de.ozgcloud.xta.client.model.XtaMessage; import de.ozgcloud.xta.client.model.XtaMessageMetaData; import de.ozgcloud.xta.client.model.XtaMessageMetaDataListing; import de.ozgcloud.xta.client.model.XtaMessageStatus; import de.ozgcloud.xta.client.model.XtaTransportReport; import lombok.AccessLevel; import lombok.Builder; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; /** * A client for sending and receiving messages via the XTA protocol. * * <p> * The client may be initialized for a {@link XtaClientConfig} configuration by {@link #from(XtaClientConfig)}. * </p> * * Example: * <pre> * var client = XtaClient.from(config); * var transportReport = client.sendMessage(message); * var transportReports = client.fetchMessages(message -> { * // process message * }); * </pre> */ @RequiredArgsConstructor(access = AccessLevel.MODULE) @Builder(access = AccessLevel.MODULE) @Log4j2 public class XtaClient { private final XtaClientService service; private final XtaClientConfig config; private final XtaExceptionHandler exceptionHandler; static final String NO_MESSAGE_CLOSED_WARNING = "No message has been closed although more are pending. Try increasing max list items."; /** * Initialize a new {@link XtaClient} instance from the given configuration. * * @param config The configuration for the client. * @return The initialized client. * @throws XtaClientInitializationException If the configuration is invalid or a configured keystore failed to initialize. */ public static XtaClient from(XtaClientConfig config) throws XtaClientInitializationException { return XtaClientFactory.from(config).create(); } /** * Fetches messages for all client identifiers. * * <p> * For each configured client identifier in {@link XtaClientConfig#getClientIdentifiers() clientIdentifiers}, checks if the client identifier is * an active account, then lists its pending/unread messages. Next, uses the {@link XtaClientConfig#getIsMessageSupported() isMessageSupported} * predicate to decide whether to fetch a listed message. Note that if no predicate is configured, all listed messages are fetched. * </p> * <p> * For each fetched message, calls the given {@code processMessage}. If {@code processMessage} does not throw a runtime exception, closes the * message, i.e., marks it as read. Then, fetches the transport report for the successfully or unsuccessfully processed message. Note that a * transport is always returned for each processed message, unless a technical problem prevents fetching the transport report. * </p> * <p> * A listing contains a maximum of {@link XtaClientConfig#getMaxListItems() maxListItems} messages. However, listing is repeated, as long as there * are more messages pending and some message is closed successfully during the latest listing. * </p> * * @param processMessage The consumer to process each fetched message. * @return The transport reports for all fetched/processed messages. A message which has not been closed has an * {@link XtaMessageStatus#OPEN OPEN} status. If a message has been closed, the status is either {@link XtaMessageStatus#GREEN GREEN}, * {@link XtaMessageStatus#YELLOW YELLOW} or {@link XtaMessageStatus#RED RED}. * @throws XtaClientException If a technical problem occurs while fetching messages. */ public List<XtaTransportReport> fetchMessages(@NotNull Consumer<XtaMessage> processMessage) throws XtaClientException { try { return fetchMessagesRaw(processMessage); } catch (RuntimeException exception) { throw exceptionHandler.deriveXtaClientException(exception); } } List<XtaTransportReport> fetchMessagesRaw(Consumer<XtaMessage> processMessage) { return config.getClientIdentifiers().stream() .filter(service::checkAccountActive) .map(clientIdentifier -> initializeFetchMessageParameter(clientIdentifier, processMessage)) .flatMap(this::fetchMessagesForClientIdentifier) .toList(); } FetchMessageParameter initializeFetchMessageParameter(XtaIdentifier clientIdentifier, Consumer<XtaMessage> processMessage) { return new FetchMessageParameter(clientIdentifier, processMessage, emptySet()); } Stream<XtaTransportReport> fetchMessagesForClientIdentifier(FetchMessageParameter parameter) { return service.getStatusList(parameter.clientIdentifier()) .map(listing -> { var transportReports = fetchMessagesForListing(listing.messages(), parameter); return Stream.concat(transportReports.stream(), checkMoreMessagesAvailable(listing, transportReports) ? fetchMessagesForClientIdentifier(parameter.withViewedMessageIdsFrom(listing.messages())) : Stream.empty()); }) .orElse(Stream.empty()); } boolean checkMoreMessagesAvailable(XtaMessageMetaDataListing listing, List<XtaTransportReport> transportReports) { return checkExtraPendingMessagesAvailable(listing) && checkSomeMessageHasBeenClosed(transportReports); } boolean checkExtraPendingMessagesAvailable(XtaMessageMetaDataListing listing) { return listing.messages().size() < listing.pendingMessageCount().intValue(); } boolean checkSomeMessageHasBeenClosed(List<XtaTransportReport> transportReports) { var someTransportReportHasGreenStatus = transportReports.stream() .anyMatch(t -> t.status().equals(XtaMessageStatus.GREEN)); if (!someTransportReportHasGreenStatus) { logWarnForNoMessageClosed(); } return someTransportReportHasGreenStatus; } void logWarnForNoMessageClosed() { log.warn(NO_MESSAGE_CLOSED_WARNING); } List<XtaTransportReport> fetchMessagesForListing( List<XtaMessageMetaData> messageMetaDataItems, FetchMessageParameter parameter ) { return messageMetaDataItems.stream() .filter(metaData -> checkMessageShouldBeFetched(metaData, parameter)) .map(metaData -> fetchMessage(metaData, parameter)) .flatMap(Optional::stream) .toList(); } boolean checkMessageShouldBeFetched(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) { return !parameter.hasMessageAlreadyBeenViewed(messageMetaData) && isMessageSupported(messageMetaData); } boolean isMessageSupported(XtaMessageMetaData messageMetaData) { return Optional.ofNullable(config.getIsMessageSupported()) .map(predicate -> predicate.test(messageMetaData)) .orElse(true); } Optional<XtaTransportReport> fetchMessage(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) { return service.getMessage(messageMetaData) .flatMap(message -> processMesssageAndFetchTransportReport(message, parameter)); } Optional<XtaTransportReport> processMesssageAndFetchTransportReport(XtaMessage message, FetchMessageParameter parameter) { processMessageAndCloseIfNoException(message, parameter.processMessage()); return service.getTransportReport(message.metaData()); } void processMessageAndCloseIfNoException(XtaMessage message, Consumer<XtaMessage> processMessage) { var messageId = message.metaData().messageId(); try { processMessage.accept(message); log.debug("Processing of message '{}' succeeded! Closing message.", messageId); service.closeMessage(message); } catch (RuntimeException exception) { logErrorForMessageProcessingFailure(messageId, exception); } } void logErrorForMessageProcessingFailure(String messageId, RuntimeException exception) { log.error("Processing of message '%s' failed! Not closing message.".formatted(messageId), exception); } /** * Sends the given message. * * <p> * Sends the message to the XTA server such that it may be asynchronously fetched by the reader. Author and reader of the message are given by the * message metadata. * </p> * <p> * First, checks whether the author refers to an active account. Then, checks that the service used by the message is available for the given * reader. If both checks pass, sends the message to the reader. Finally, returns the transport report for the sent message. * </p> * <p> * <b>Note:</b> message size and id are set by the server, and thus may initially be null. * </p> * * @param messageWithoutMessageId The message to send without id and size. * @return The transport report for the sent message. As long as the reader did not read the message the status is {@link XtaMessageStatus#OPEN}. * Moreover, the report contains the message metadata, including id and size values set by the server. * @throws XtaClientException If a check fails or a technical problem occurs while sending the message. */ public XtaTransportReport sendMessage(@NotNull @Valid XtaMessage messageWithoutMessageId) throws XtaClientException { try { return sendMessageRaw(messageWithoutMessageId); } catch (RuntimeException exception) { throw exceptionHandler.deriveXtaClientException(exception); } } XtaTransportReport sendMessageRaw(XtaMessage messageWithoutMessageId) { var metaData = messageWithoutMessageId.metaData(); throwExceptionIfAccountInactive(metaData.authorIdentifier()); throwExceptionIfServiceNotAvailable(metaData); return service.sendMessage(messageWithoutMessageId); } void throwExceptionIfServiceNotAvailable(XtaMessageMetaData metaData) { if (!service.lookupService(metaData)) { throw new ClientRuntimeException("Service %s is not available!".formatted(metaData.service())); } } void throwExceptionIfAccountInactive(XtaIdentifier clientIdentifier) { if (!service.checkAccountActive(clientIdentifier)) { throw new ClientRuntimeException("Account %s is not active!".formatted(clientIdentifier.value())); } } }