Newer
Older
package de.ozgcloud.xta.client;
import static java.util.Collections.*;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;

Jan Zickermann
committed
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import de.ozgcloud.xta.client.config.XtaClientConfig;
import de.ozgcloud.xta.client.core.XtaClientService;
import de.ozgcloud.xta.client.core.XtaExceptionHandler;
import de.ozgcloud.xta.client.exception.XtaClientException;
import de.ozgcloud.xta.client.exception.XtaClientInitializationException;
import de.ozgcloud.xta.client.exception.XtaClientRuntimeException;
import de.ozgcloud.xta.client.model.XtaIdentifier;

Jan Zickermann
committed
import de.ozgcloud.xta.client.model.XtaMessage;
import de.ozgcloud.xta.client.model.XtaMessageMetaData;
import de.ozgcloud.xta.client.model.XtaMessageMetaDataListing;
import de.ozgcloud.xta.client.model.XtaMessageStatus;

Jan Zickermann
committed
import de.ozgcloud.xta.client.model.XtaTransportReport;

Jan Zickermann
committed
import lombok.AccessLevel;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

Jan Zickermann
committed
/**
* A client for sending and receiving messages via the XTA protocol.
*
* <p>
* The client may be initialized with an {@link XtaClientConfig} configuration by {@link #from(XtaClientConfig)}.
* </p>
*
* Example:
* <pre>
* var client = XtaClient.from(config);
* var transportReport = client.sendMessage(message);
* var transportReports = client.fetchMessages(message -> {
* // process message
* });
* </pre>
*/
@RequiredArgsConstructor(access = AccessLevel.MODULE)
@Builder(access = AccessLevel.MODULE)
public class XtaClient {
private final XtaClientService service;

Jan Zickermann
committed
private final XtaClientConfig config;
private final XtaExceptionHandler exceptionHandler;
static final String NO_MESSAGE_CLOSED_WARNING = "No message has been closed although more are pending. Try increasing max list items.";
* Initialize a new {@link XtaClient} instance based on the given configuration.
*
* @param config The configuration for the client.
* @return The initialized client.
* @throws XtaClientInitializationException If the configuration is invalid or a configured keystore failed to initialize.
*/
public static XtaClient from(XtaClientConfig config) throws XtaClientInitializationException {
return XtaClientFactory.from(config).create();
}
/**
* Fetches messages for all client identifiers.
*
* <p>
* For each configured client identifier in {@link XtaClientConfig#getClientIdentifiers() clientIdentifiers}, checks if the client identifier is
* an active account, then lists its pending/unread messages. Next, uses the {@link XtaClientConfig#getIsMessageSupported() isMessageSupported}
* predicate to decide whether to fetch a listed message. Note that if no predicate is configured, all listed messages are fetched.
* </p>
* <p>
* For each fetched message, calls the given {@code processMessage}. If {@code processMessage} does not throw a runtime exception, closes the
* message, i.e., marks it as read. Then, fetches the transport report for the successfully or unsuccessfully processed message. Note that a
* transport is always returned for each processed message, unless a technical problem prevents fetching the transport report.
* A listing contains a maximum of {@link XtaClientConfig#getMaxListItems() maxListItems} messages. However, listing is repeated, as long as there
* are more messages pending and some message is closed successfully during the latest listing.
* </p>
*
* @param processMessage The consumer to process each fetched message.
* @return The transport reports for all fetched/processed messages. A message which has not been closed has an {@link XtaMessageStatus#OPEN OPEN}
* status. If a message has been closed, the status is either {@link XtaMessageStatus#GREEN GREEN}, {@link XtaMessageStatus#YELLOW YELLOW} or
* {@link XtaMessageStatus#RED RED}.
* @throws XtaClientException If a technical problem occurs while fetching messages.
*/
public List<XtaTransportReport> fetchMessages(@NotNull Consumer<XtaMessage> processMessage) throws XtaClientException {
try {
return fetchMessagesRaw(processMessage);
} catch (RuntimeException exception) {
throw exceptionHandler.deriveXtaClientException(exception);
}
}
List<XtaTransportReport> fetchMessagesRaw(Consumer<XtaMessage> processMessage) {
return config.getClientIdentifiers().stream()
.filter(service::checkAccountActive)
.map(clientIdentifier -> initializeFetchMessageParameter(clientIdentifier, processMessage))
.flatMap(this::fetchMessagesForClientIdentifier)
.toList();
}

Jan Zickermann
committed
FetchMessageParameter initializeFetchMessageParameter(XtaIdentifier clientIdentifier, Consumer<XtaMessage> processMessage) {
return new FetchMessageParameter(clientIdentifier, processMessage, emptySet());
}
Stream<XtaTransportReport> fetchMessagesForClientIdentifier(FetchMessageParameter parameter) {
return service.getStatusList(parameter.clientIdentifier())
.map(listing -> {
var transportReports = fetchMessagesForListing(listing.messages(), parameter);
return Stream.concat(transportReports.stream(),
checkMoreMessagesAvailable(listing, transportReports)
? fetchMessagesForClientIdentifier(parameter.withViewedMessageIdsFrom(listing.messages()))
: Stream.empty());
})
.orElse(Stream.empty());
}

Jan Zickermann
committed
boolean checkMoreMessagesAvailable(XtaMessageMetaDataListing listing, List<XtaTransportReport> transportReports) {
return checkExtraPendingMessagesAvailable(listing) && checkSomeMessageHasBeenClosed(transportReports);
}

Jan Zickermann
committed
boolean checkExtraPendingMessagesAvailable(XtaMessageMetaDataListing listing) {
return listing.messages().size() < listing.pendingMessageCount().intValue();

Jan Zickermann
committed
}
boolean checkSomeMessageHasBeenClosed(List<XtaTransportReport> transportReports) {
var someTransportReportHasClosedStatus = transportReports.stream()
.anyMatch(t -> !t.status().equals(XtaMessageStatus.OPEN));
if (!someTransportReportHasClosedStatus) {
logWarnForNoMessageClosed();
}
return someTransportReportHasClosedStatus;

Jan Zickermann
committed
}
void logWarnForNoMessageClosed() {
log.warn(NO_MESSAGE_CLOSED_WARNING);
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
}
List<XtaTransportReport> fetchMessagesForListing(
List<XtaMessageMetaData> messageMetaDataItems,
FetchMessageParameter parameter
) {
return messageMetaDataItems.stream()
.filter(metaData -> checkMessageShouldBeFetched(metaData, parameter))
.map(metaData -> fetchMessage(metaData, parameter))
.flatMap(Optional::stream)
.toList();
}
boolean checkMessageShouldBeFetched(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) {
return !parameter.hasMessageAlreadyBeenViewed(messageMetaData) && isMessageSupported(messageMetaData);
}
boolean isMessageSupported(XtaMessageMetaData messageMetaData) {
return Optional.ofNullable(config.getIsMessageSupported())
.map(predicate -> predicate.test(messageMetaData))
.orElse(true);
}
Optional<XtaTransportReport> fetchMessage(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) {
return service.getMessage(messageMetaData)
.flatMap(message -> processMesssageAndFetchTransportReport(message, parameter));
Optional<XtaTransportReport> processMesssageAndFetchTransportReport(XtaMessage message, FetchMessageParameter parameter) {
processMessageAndCloseIfNoException(message, parameter.processMessage());
return service.getTransportReport(message.metaData());
}
void processMessageAndCloseIfNoException(XtaMessage message, Consumer<XtaMessage> processMessage) {
var messageId = message.metaData().messageId();
try {
processMessage.accept(message);
log.debug("Processing of message '{}' succeeded! Closing message.", messageId);
service.closeMessage(message);
} catch (RuntimeException exception) {
logErrorForMessageProcessingFailure(messageId, exception);
}
}
void logErrorForMessageProcessingFailure(String messageId, RuntimeException exception) {
log.error("Processing of message '%s' failed! Not closing message.".formatted(messageId), exception);
}
/**
* Sends the given message.
*
* <p>
* Sends the message to the XTA server such that it may be asynchronously fetched by the reader. Author and reader of the message are given by the
* message metadata.
* </p>
* <p>
* First, checks whether the author refers to an active account. Then, checks that the service used by the message is available for the given
* reader. If both checks pass, sends the message to the reader. Finally, returns the transport report for the sent message.
* </p>
* <p>
* <b>Note:</b> message size and id are set by the server, and thus may initially be null. This also applies to the size of message and attachment
* files.
* @param messageWithoutMessageId The XTA message to send without id and size.
* @return The transport report for the sent message. As long as no critical error occurred, indicated by status {@link XtaMessageStatus#RED RED},
* the message the status is {@link XtaMessageStatus#OPEN OPEN}, until the reader closes the message. Moreover, the report contains the message
* metadata, including id and size values set by the server.
* @throws XtaClientException If a check fails or a technical problem occurs while sending the message.
*/
public XtaTransportReport sendMessage(@NotNull @Valid XtaMessage messageWithoutMessageId) throws XtaClientException {
return sendMessageRaw(messageWithoutMessageId);
} catch (RuntimeException exception) {
throw exceptionHandler.deriveXtaClientException(exception);
}
}
XtaTransportReport sendMessageRaw(XtaMessage messageWithoutMessageId) {
var metaData = messageWithoutMessageId.metaData();
throwExceptionIfAccountInactive(metaData.authorIdentifier());
throwExceptionIfServiceNotAvailable(metaData);
return service.sendMessage(messageWithoutMessageId);
}
void throwExceptionIfServiceNotAvailable(XtaMessageMetaData metaData) {
if (!service.lookupService(metaData)) {
throw new XtaClientRuntimeException("Service %s is not available!".formatted(metaData.service()));
}
}
void throwExceptionIfAccountInactive(XtaIdentifier clientIdentifier) {
if (!service.checkAccountActive(clientIdentifier)) {
throw new XtaClientRuntimeException("Account %s is not active!".formatted(clientIdentifier.value()));