package de.ozgcloud.xta.client;

import static java.util.Collections.*;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import de.ozgcloud.xta.client.config.XtaClientConfig;
import de.ozgcloud.xta.client.core.XtaClientService;
import de.ozgcloud.xta.client.core.XtaExceptionHandler;
import de.ozgcloud.xta.client.exception.ClientRuntimeException;
import de.ozgcloud.xta.client.exception.XtaClientException;
import de.ozgcloud.xta.client.exception.XtaClientInitializationException;
import de.ozgcloud.xta.client.model.XtaIdentifier;
import de.ozgcloud.xta.client.model.XtaMessage;
import de.ozgcloud.xta.client.model.XtaMessageMetaData;
import de.ozgcloud.xta.client.model.XtaMessageMetaDataListing;
import de.ozgcloud.xta.client.model.XtaMessageStatus;
import de.ozgcloud.xta.client.model.XtaTransportReport;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

/**
 * A client for sending and receiving messages via the XTA protocol.
 *
 * <p>
 * The client may be initialized for a {@link XtaClientConfig} configuration by {@link #from(XtaClientConfig)}.
 * </p>
 *
 * Example:
 * <pre>
 * var client = XtaClient.from(config);
 * var transportReport = client.sendMessage(message);
 * var transportReports = client.fetchMessages(message -> {
 *   // process message
 * });
 * </pre>
 */
@RequiredArgsConstructor(access = AccessLevel.MODULE)
@Builder(access = AccessLevel.MODULE)
@Log4j2
public class XtaClient {

	private final XtaClientService service;
	private final XtaClientConfig config;
	private final XtaExceptionHandler exceptionHandler;

	static final String NO_MESSAGE_CLOSED_WARNING = "No message has been closed although more are pending. Try increasing max list items.";

	/**
	 * Initialize a new {@link XtaClient} instance from the given configuration.
	 *
	 * @param config The configuration for the client.
	 * @return The initialized client.
	 * @throws XtaClientInitializationException If the configuration is invalid or a configured keystore failed to initialize.
	 */
	public static XtaClient from(XtaClientConfig config) throws XtaClientInitializationException {
		return XtaClientFactory.from(config).create();
	}

	/**
	 * Fetches messages for all client identifiers.
	 *
	 * <p>
	 * For each configured client identifier in {@link XtaClientConfig#getClientIdentifiers() clientIdentifiers}, checks if the client identifier is
	 * an active account, then lists its pending/unread messages. Next, uses the {@link XtaClientConfig#getIsMessageSupported() isMessageSupported}
	 * predicate to decide whether to fetch a listed message. Note that if no predicate is configured, all listed messages are fetched.
	 * </p>
	 * <p>
	 * For each fetched message, calls the given {@code processMessage}. If {@code processMessage} does not throw a runtime exception, closes the
	 * message, i.e., marks it as read. Then, fetches the transport report for the successfully or unsuccessfully processed message. Note that a
	 * transport is always returned for each processed message, unless a technical problem prevents fetching the transport report.
	 * </p>
	 * <p>
	 * A listing contains a maximum of {@link XtaClientConfig#getMaxListItems() maxListItems} messages. However, listing is repeated, as long as there
	 * are more messages pending and some message is closed successfully during the latest listing.
	 * </p>
	 *
	 * @param processMessage The consumer to process each fetched message.
	 * @return The transport reports for all fetched/processed messages. A message which has not been closed has an
	 * {@link XtaMessageStatus#OPEN OPEN} status. If a message has been closed, the status is either {@link XtaMessageStatus#GREEN GREEN},
	 * {@link XtaMessageStatus#YELLOW YELLOW} or {@link XtaMessageStatus#RED RED}.
	 * @throws XtaClientException If a technical problem occurs while fetching messages.
	 */
	public List<XtaTransportReport> fetchMessages(@NotNull Consumer<XtaMessage> processMessage) throws XtaClientException {
		try {
			return fetchMessagesRaw(processMessage);
		} catch (RuntimeException exception) {
			throw exceptionHandler.deriveXtaClientException(exception);
		}
	}

	List<XtaTransportReport> fetchMessagesRaw(Consumer<XtaMessage> processMessage) {
		return config.getClientIdentifiers().stream()
				.filter(service::checkAccountActive)
				.map(clientIdentifier -> initializeFetchMessageParameter(clientIdentifier, processMessage))
				.flatMap(this::fetchMessagesForClientIdentifier)
				.toList();
	}

	FetchMessageParameter initializeFetchMessageParameter(XtaIdentifier clientIdentifier, Consumer<XtaMessage> processMessage) {
		return new FetchMessageParameter(clientIdentifier, processMessage, emptySet());
	}

	Stream<XtaTransportReport> fetchMessagesForClientIdentifier(FetchMessageParameter parameter) {
		return service.getStatusList(parameter.clientIdentifier())
				.map(listing -> {
					var transportReports = fetchMessagesForListing(listing.messages(), parameter);
					return Stream.concat(transportReports.stream(),
							checkMoreMessagesAvailable(listing, transportReports)
									? fetchMessagesForClientIdentifier(parameter.withViewedMessageIdsFrom(listing.messages()))
									: Stream.empty());
				})
				.orElse(Stream.empty());
	}

	boolean checkMoreMessagesAvailable(XtaMessageMetaDataListing listing, List<XtaTransportReport> transportReports) {
		return checkExtraPendingMessagesAvailable(listing) && checkSomeMessageHasBeenClosed(transportReports);
	}

	boolean checkExtraPendingMessagesAvailable(XtaMessageMetaDataListing listing) {
		return listing.messages().size() < listing.pendingMessageCount().intValue();
	}

	boolean checkSomeMessageHasBeenClosed(List<XtaTransportReport> transportReports) {
		var someTransportReportHasGreenStatus = transportReports.stream()
				.anyMatch(t -> t.status().equals(XtaMessageStatus.GREEN));
		if (!someTransportReportHasGreenStatus) {
			logWarnForNoMessageClosed();
		}
		return someTransportReportHasGreenStatus;
	}

	void logWarnForNoMessageClosed() {
		log.warn(NO_MESSAGE_CLOSED_WARNING);
	}

	List<XtaTransportReport> fetchMessagesForListing(
			List<XtaMessageMetaData> messageMetaDataItems,
			FetchMessageParameter parameter
	) {
		return messageMetaDataItems.stream()
				.filter(metaData -> checkMessageShouldBeFetched(metaData, parameter))
				.map(metaData -> fetchMessage(metaData, parameter))
				.flatMap(Optional::stream)
				.toList();
	}

	boolean checkMessageShouldBeFetched(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) {
		return !parameter.hasMessageAlreadyBeenViewed(messageMetaData) && isMessageSupported(messageMetaData);
	}

	boolean isMessageSupported(XtaMessageMetaData messageMetaData) {
		return Optional.ofNullable(config.getIsMessageSupported())
				.map(predicate -> predicate.test(messageMetaData))
				.orElse(true);
	}

	Optional<XtaTransportReport> fetchMessage(XtaMessageMetaData messageMetaData, FetchMessageParameter parameter) {
		return service.getMessage(messageMetaData)
				.flatMap(message -> processMesssageAndFetchTransportReport(message, parameter));
	}

	Optional<XtaTransportReport> processMesssageAndFetchTransportReport(XtaMessage message, FetchMessageParameter parameter) {
		processMessageAndCloseIfNoException(message, parameter.processMessage());
		return service.getTransportReport(message.metaData());
	}

	void processMessageAndCloseIfNoException(XtaMessage message, Consumer<XtaMessage> processMessage) {
		var messageId = message.metaData().messageId();
		try {
			processMessage.accept(message);
			log.debug("Processing of message '{}' succeeded! Closing message.", messageId);
			service.closeMessage(message);
		} catch (RuntimeException exception) {
			logErrorForMessageProcessingFailure(messageId, exception);
		}
	}

	void logErrorForMessageProcessingFailure(String messageId, RuntimeException exception) {
		log.error("Processing of message '%s' failed! Not closing message.".formatted(messageId), exception);
	}

	/**
	 * Sends the given message.
	 *
	 * <p>
	 * Sends the message to the XTA server such that it may be asynchronously fetched by the reader. Author and reader of the message are given by the
	 * message metadata.
	 * </p>
	 * <p>
	 * First, checks whether the author refers to an active account. Then, checks that the service used by the message is available for the given
	 * reader. If both checks pass, sends the message to the reader. Finally, returns the transport report for the sent message.
	 * </p>
	 * <p>
	 * <b>Note:</b> message size and id are set by the server, and thus may initially be null.
	 * </p>
	 *
	 * @param messageWithoutMessageId The message to send without id and size.
	 * @return The transport report for the sent message. As long as the reader did not read the message the status is {@link XtaMessageStatus#OPEN}.
	 * Moreover, the report contains the message metadata, including id and size values set by the server.
	 * @throws XtaClientException If a check fails or a technical problem occurs while sending the message.
	 */
	public XtaTransportReport sendMessage(@NotNull @Valid XtaMessage messageWithoutMessageId) throws XtaClientException {
		try {
			return sendMessageRaw(messageWithoutMessageId);
		} catch (RuntimeException exception) {
			throw exceptionHandler.deriveXtaClientException(exception);
		}
	}

	XtaTransportReport sendMessageRaw(XtaMessage messageWithoutMessageId) {
		var metaData = messageWithoutMessageId.metaData();
		throwExceptionIfAccountInactive(metaData.authorIdentifier());
		throwExceptionIfServiceNotAvailable(metaData);
		return service.sendMessage(messageWithoutMessageId);
	}

	void throwExceptionIfServiceNotAvailable(XtaMessageMetaData metaData) {
		if (!service.lookupService(metaData)) {
			throw new ClientRuntimeException("Service %s is not available!".formatted(metaData.service()));
		}
	}

	void throwExceptionIfAccountInactive(XtaIdentifier clientIdentifier) {
		if (!service.checkAccountActive(clientIdentifier)) {
			throw new ClientRuntimeException("Account %s is not active!".formatted(clientIdentifier.value()));
		}
	}

}