Skip to content
Snippets Groups Projects
dcat_catalog_check.py 22.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • Jesper Zedlitz's avatar
    Jesper Zedlitz committed
    #!/usr/bin/env python3
    """A script that checks client sites for dead links.
    
    Gets lists of links to be checked from a DCAT catalog, checks the links
    to see if they're working or broken and writes the results in JSON Lines
    text file format
    """
    
    import argparse
    import bz2
    import gzip
    import hashlib
    import json
    import logging
    import lzma
    import magic
    import os
    import re
    import requests
    import sys
    import time
    from zipfile import ZipFile
    import importlib
    import tempfile
    from rdflib import Graph, Namespace
    from rdflib.namespace import RDF, DCAT, DCTERMS, FOAF
    from datetime import datetime
    
    HYDRA = Namespace("http://www.w3.org/ns/hydra/core#")
    SPDX = Namespace("http://spdx.org/rdf/terms#")
    
    
    class CouldNotGetResourceIDsError(Exception):
        """Raised if getting the resource IDs to check fails."""
    
        pass
    
    
    class CouldNotGetURLError(Exception):
        """Raised if getting the URL for a given ID from the client site fails."""
    
        pass
    
    
    class CheckOptions:
        format_to_check = None
        force_check_format = None
        http_error = None
        http_error_5xx = False
    
        def __init__(self):
            pass
    
    
    class DcatCatalogCheck:
        check_options = CheckOptions()
        no_recheck = False
    
        def __init__(
            self,
            client_site_url,
            log_file=None,
            verbose=False,
            debug=False,
        ):
            self.client_site_url = client_site_url
            self.log_file = log_file
            self.verbose = verbose
            self.debug = debug
            self.allowed_file_formats = self.read_allowed_file_formats()
            self.uri_replacements = self.load_uri_replacements()
            self.logger = self._get_logger()
            self.previous_results = {}
    
        def _get_logger(self):
            """Configure and return a logger with optional file and verbosity settings.
    
            :return: Configured logger.
            """
            logger = logging.getLogger("dcatcatalogcheck")
            level = (
                logging.DEBUG
                if self.debug
                else logging.INFO
                if self.verbose
                else logging.WARNING
            )
            logger.setLevel(level)
    
            ch = logging.StreamHandler()
            ch.setLevel(level)
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
            ch.setFormatter(formatter)
            logger.addHandler(ch)
    
            if self.log_file:
                fh = logging.FileHandler(self.log_file)
                fh.setLevel(level)
                fh.setFormatter(formatter)
                logger.addHandler(fh)
    
            return logger
    
        def is_mime_type_compatible(self, expected_format, actual_format):
            """Check if the actual MIME type is compatible with the expected format.
    
            :param expected_format: The expected file format.
            :param actual_format: The actual MIME type of the file.
            :return: True if the MIME type is compatible, False otherwise.
            """
    
            return actual_format in self.allowed_file_formats.get(
                expected_format.upper(), []
            )
    
        def read_allowed_file_formats(self):
            """Read the allowed file formats from a JSON file.
    
            :return: Dictionary of allowed file formats.
            """
            with open("resources/file_types.json") as f:
                return json.load(f)
    
        def load_uri_replacements(self):
            """Load URI replacements from a JSON file if it exists."""
            replacements_file = "uri_replacements.json"
            if os.path.exists(replacements_file):
                with open(replacements_file, "r") as f:
                    return json.load(f)
            return []
    
        def apply_uri_replacements(self, url):
            """Apply regex-based URI replacements to the given URL."""
            for replacement in self.uri_replacements:
                regex = replacement["regex"]
                replaced_by = replacement["replaced_by"]
                # Use regex substitution to replace URI parts in the URL
                url = re.sub(regex, replaced_by, url)
            return url
    
        def load_http_complete(self, url):
            """Load a complete file from the specified URL and return the response object."""
            return requests.get(url)
    
        def _clear_result(self, resource):
            """Clear the result so we can start a fresh check of the distribution"""
    
            for key in [
                "accessible",
                "checksum_ok",
                "duration",
                "error",
                "etag",
                "http_status",
    
                "last_check",
                "mimetype",
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                "mimetype_mismatch",
                "valid",
            ]:
                if key in resource:
                    del resource[key]
    
        def check_resource(self, resource):
            """Check whether the given URL is dead or alive, with additional checks for file size and type."""
            url = self.apply_uri_replacements(resource["url"])
    
            self.logger.debug(f"Checking {url}...")
    
            response = None
            resource["last_check"] = str(datetime.now())
    
            # try to find a specialized module based on the specified format specified in the metadata of the distribution
            format = resource["format"].lower()
            try:
                # dynamically import the corresponding module for the format
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                format_check_module = importlib.import_module(f"formats.{format}_format")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            except ModuleNotFoundError:
                format_check_module = None
    
            # download the file
            try:
                start_time = time.perf_counter()
                response = self.load_http_complete(url)
                end_time = time.perf_counter()
    
                resource["http_status"] = response.status_code
                resource["duration"] = end_time - start_time
    
                response.raise_for_status()  # Raise for 4xx/5xx status codes
                resource["accessible"] = True
    
                if "etag" in response.headers:
                    resource["etag"] = response.headers["etag"]
    
    
                if "content-length" in response.headers:
                    resource["size"] = response.headers["content-length"]
    
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            except requests.exceptions.RequestException as err:
                # Handle connection, timeout, or other request errors
                resource["accessible"] = False
                resource["error"] = str(err)
    
                # no more checks possible
                return
            except AttributeError as err:
                resource["accessible"] = False
                resource["error"] = str(err)
    
                # no more checks possible
                return
    
            # write the content of the HTTP response into a temporary file
            original_file_name = url.split("/")[-1]
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            suffix = original_file_name.split(".")[-1] if "." in original_file_name else ""
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            with tempfile.NamedTemporaryFile(
                delete=False, suffix="." + suffix
            ) as temp_file:
                temp_file.write(response.content)
                temp_file.flush()
    
            resource["mimetype"] = self._guess_mime_type(temp_file.name)
    
            if resource.get("checksum_algorithm") and resource.get("checksum_value"):
                self._check_checksum(resource, temp_file)
    
            # handle compressed files
            if self._is_compressed(resource["mimetype"]):
                resource["compress_format"] = resource["mimetype"]
                decompressors = {
                    "application/gzip": gzip,
                    "application/x-bzip2": bz2,
                    "application/x-xz": lzma,
                }
    
                decompressor = decompressors.get(resource["mimetype"])
                if not decompressor:
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                    self.logger.warning(f"Unknown compression {resource['mimetype']}.")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                else:
                    with tempfile.NamedTemporaryFile(delete=False) as decompressed_file:
                        with decompressor.open(temp_file.name, "rb") as compressed_file:
                            decompressed_file.write(compressed_file.read())
                            decompressed_file.flush()
                    os.remove(temp_file.name)
                    temp_file = decompressed_file
                    resource["mimetype"] = self._guess_mime_type(temp_file.name)
    
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            if self._is_container(resource["mimetype"], resource["format"]) and resource[
                "format"
            ] not in ["GTFS", "GEOTIFF", "SHP"]:
                self._check_container_file(resource, temp_file, format_check_module)
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            else:
                self._check_single_file(resource, temp_file, format_check_module)
    
            # clean up
            os.remove(temp_file.name)
    
        def _check_container_file(self, resource, temp_file, format_check_module):
            if "application/zip" == resource["mimetype"]:
                resource["package_format"] = "application/zip"
    
                validation_result = True
                contains_at_least_one_relevant_file = False
                filename_suffix = "." + resource["format"].lower()
    
                with ZipFile(temp_file.name) as zip_file:
                    file_list = zip_file.namelist()
                    relevant_files = [
                        file for file in file_list if file.endswith(filename_suffix)
                    ]
                    contains_at_least_one_relevant_file = len(relevant_files) > 0
    
                    for file_name in relevant_files:
                        with zip_file.open(file_name) as file:
                            with tempfile.NamedTemporaryFile(delete=True) as temp_file:
                                temp_file.write(file.read())
                                temp_file.flush()
    
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                                resource["mimetype"] = self._guess_mime_type(temp_file.name)
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                                validation_result = (
                                    validation_result
                                    and self._check_single_file(
                                        resource, temp_file, format_check_module
                                    )
                                )
    
                if not contains_at_least_one_relevant_file:
                    resource["mimetype_mismatch"] = True
    
                return contains_at_least_one_relevant_file and validation_result
    
            else:
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                self.logger.error(f"Unsupported container format {resource['mimetype']}")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
    
        def _check_single_file(self, resource, temp_file, format_check_module):
            if format_check_module:
                # call the function `process` that is defined in every modul
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                resource["valid"] = format_check_module.is_valid(resource, temp_file)
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
            else:
                # There is no specialized check for the specified format.
                # Does the returned MIME type match the promised format?
    
                if not self.is_mime_type_compatible(
                    resource["format"], resource["mimetype"]
                ):
                    resource["mimetype_mismatch"] = True
                    self.logger.warning(
                        f"MIME type mismatch: expected {resource['format']}, but got {resource['mimetype']}"
                    )
    
        def _check_checksum(self, resource, temp_file):
            algo_name = resource["checksum_algorithm"]
            if algo_name == "http://spdx.org/rdf/terms#checksumAlgorithm_sha1":
                hash_algorithm = hashlib.sha1()
            elif algo_name == "http://spdx.org/rdf/terms#checksumAlgorithm_sha256":
                hash_algorithm = hashlib.sha256()
            elif (
                algo_name == "http://spdx.org/rdf/terms#checksumAlgorithm_md5"
                or algo_name == "http://dcat-ap.de/def/hashAlgorithms/md/5"
            ):
                hash_algorithm = hashlib.md5()
            else:
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                print(f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                return
    
            with open(temp_file.name, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_algorithm.update(chunk)
            checksum = hash_algorithm.hexdigest()
            resource["checksum_ok"] = checksum == resource["checksum_value"]
    
        def _guess_mime_type(self, file_name):
            """Guess the MIME type of a file using `magic` library."""
            mime = magic.Magic(mime=True)
            return mime.from_file(file_name)
    
        def _is_compressed(self, mime_type):
            """Does the specified MIME type indicate a compressed file?"""
            return mime_type in [
                "application/gzip",
                "application/x-bzip2",
                "application/x-xz",
            ]
    
        def _is_container(self, mime_type, expected_format):
            """Does the specified MIME type indicate a container file that
            contains other files. Examples for container files are ZIP or TAR."""
    
            if mime_type == "application/x-tar":
                return True
    
            if mime_type != "application/zip":
                return False
    
            # ZIP files are difficult. There are formats file SHP and GTFS
            # files that are ZIP files. However, these formats will be
            # checked differently. Therefor, we have to take a look a the
            # expected format and check if this format is based on ZIP.
            return "application/zip" not in self.allowed_file_formats.get(
                expected_format.upper(), []
            )
    
        def _needs_check(self, url):
            """Check if the specified URL has to be check or if it has been inspected recently."""
            if url not in self.previous_results:
                return True
            elif self.no_recheck:
                # The user requested not to re-check any distributions.
                # Therefore, a (re-)check of this distribution is NOT necessary.
                return False
            else:
                previous_result = self.previous_results[url]
    
                if self.check_options and self.check_options.force_check_format:
                    # the user requested to look only at the format of the distribution
                    return (
                        previous_result.get("format", "").lower()
                        == self.check_options.force_check_format.lower()
                    )
    
                if self.check_options and self.check_options.http_error_5xx:
                    # the user requested only checks of HTTP 5xx error
                    last_http_status = previous_result.get("http_status", 0)
                    recheck_necessary = last_http_status >= 500 and last_http_status <= 599
                else:
                    recheck_necessary = (
                        not previous_result.get("accessible", False)
                        or previous_result.get("mimetype_mismatch", False)
                        or not previous_result.get("valid", True)
                        or not previous_result.get("checksum_ok", True)
                        or not previous_result.get("schema_valid", True)
                    )
    
                if recheck_necessary and self.check_options:
                    # Did the user specify a limit set of checks?
                    if self.check_options.format_to_check:
                        recheck_necessary = (
                            previous_result.get("format", "").lower()
                            == self.check_options.format_to_check.lower()
                        )
    
                if not recheck_necessary:
                    self.logger.debug(
                        f"Skipping {url}. It has been checked at {previous_result.get('last_check')}."
                    )
                else:
                    self.logger.debug(
                        f"Rechecking {url}. Last check was at {previous_result.get('last_check')}."
                    )
                return recheck_necessary
    
        def _get_publisher(self, graph, dataset):
            try:
                # Attempt to get the publisher URI
                publisher = graph.value(dataset, DCTERMS.publisher)
    
                if not publisher:
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                    self.logger.warning(f"Publisher not found for dataset: {dataset}")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                    return None
    
                # Attempt to get the publisher's name
                publisher_name = graph.value(publisher, FOAF.name)
    
                if publisher_name:
                    return str(publisher_name)
                else:
                    # If the name is not available, fallback to using the publisher URI
                    return str(publisher)
    
            except Exception as e:
                # Log any unexpected errors
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                self.logger.error(f"Error retrieving publisher for dataset {dataset}: {e}")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                return None
    
        def _process_datasets(self, datasets, g):
            for dataset in datasets:
                publisher = self._get_publisher(g, dataset)
    
                # find the distributions for each dataset
                for distribution in g.objects(dataset, DCAT.distribution):
                    if (distribution, RDF.type, DCAT.Distribution) in g:
                        resource = {}
                        format_url = g.value(distribution, DCTERMS.format)
                        resource["id"] = str(distribution)
                        resource["dataset"] = str(dataset)
                        resource["format"] = (
                            format_url.split("/")[-1] if format_url else None
                        )
                        resource["publisher"] = publisher
                        resource["url"] = g.value(distribution, DCAT.downloadURL)
                        if not resource["url"]:
                            # if the distribution does not have a downloadURL try the accessURL
                            resource["url"] = g.value(distribution, DCAT.accessURL)
    
                        url = str(resource["url"])
                        if self._needs_check(url):
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                            checksum_resource = g.value(distribution, SPDX.checksum)
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                            if checksum_resource:
                                resource["checksum_algorithm"] = str(
                                    g.value(checksum_resource, SPDX.algorithm)
                                )
                                resource["checksum_value"] = str(
                                    g.value(checksum_resource, SPDX.checksumValue)
                                )
    
                            self.check_resource(resource)
                            self._report_result(resource)
                        else:
                            previous_result = self.previous_results[url]
                            self._report_result(previous_result)
    
        def _report_result(self, resource):
            print(json.dumps(resource), flush=True)
    
        def read_previous_results(self, file_path):
            if not os.path.exists(file_path):
                self.logger.warning(
    
                    f"File '{file_path}' does not exist. No previous results loaded."
                )
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                return
    
            loaded_count = 0
            skipped_count = 0
    
            with open(file_path, "r", encoding="utf-8") as file:
                for line_number, line in enumerate(file, start=1):
                    line = line.strip()
                    if not line:
                        self.logger.debug(f"Skipped empty line {line_number}.")
                        skipped_count += 1
                        continue
    
                    try:
                        json_object = json.loads(line)
                        url = json_object.get("url")
                        if not url:
                            self.logger.warning(
    
                                f"Line {line_number} is missing 'url': {line}"
                            )
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                            skipped_count += 1
                            continue
    
                        self.previous_results[url] = json_object
                        loaded_count += 1
    
                    except json.JSONDecodeError as e:
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                        self.logger.error(f"Invalid JSON at line {line_number}: {e}")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                        skipped_count += 1
    
            self.logger.info(
    
                f"Loaded {loaded_count} results from '{file_path}', skipped {skipped_count} lines."
            )
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
    
        def read_dcat_catalog(self, url):
            while url:
                self.logger.info(f"Loading metadata from {url} ...")
                response = requests.get(url)
                response.raise_for_status()  # check if the HTTP request was successful
    
                # parse the retrieved data into an RDF graph object
                g = Graph()
    
                if response.headers["content-type"] == "application/rdf+xml":
                    g.parse(data=response.text, format="xml")
                else:
                    g.parse(data=response.text)
    
                # find all dcat:Dataset entries
                datasets = []
                for s in g.subjects(predicate=RDF.type, object=DCAT.Dataset):
                    datasets.append(s)
    
                self._process_datasets(datasets, g)
    
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                paged_collection = g.value(predicate=RDF.type, object=HYDRA.PagedCollection)
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
                next_page = g.value(paged_collection, HYDRA.nextPage)
                url = str(next_page) if next_page else None
    
        def recheck_only(self):
            for url in self.previous_results:
                resource = self.previous_results[url]
                if self._needs_check(url):
                    self.logger.info(f"{url} needs a re-check")
                    self._clear_result(resource)
                    self.check_resource(resource)
                    self._report_result(resource)
                else:
                    self._report_result(resource)
    
    
    if __name__ == "__main__":
        """Parse command-line arguments and run the link checker.
    
        :param args: List of command-line arguments. If None, uses sys.argv.
        """
    
        parser = argparse.ArgumentParser()
        parser.add_argument("--url", help="DCAT catalog URL")
        parser.add_argument("--log_file", help="Log file path")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
        parser.add_argument("--results", help="File from which the results are loaded")
        parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
        parser.add_argument("--debug", action="store_true", help="Enable debug logging")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
        parser.add_argument(
            "--recheck",
            action="store_true",
            help="Only use the previous results (specified by --results) as input for the next check",
        )
        parser.add_argument(
            "--no-recheck",
            action="store_true",
            help="Just check new entries from the catalog. Do not re-check existing results.",
        )
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
        parser.add_argument("--check-format", help="Only check the specified format")
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
        parser.add_argument(
            "--force-check-format",
            help="Check distributinons with the specified format regardless of previous results",
        )
        parser.add_argument(
            "--check-http-5xx", action="store_true", help="Only re-check HTTP 5xx errors"
        )
        args = parser.parse_args()
    
        checker = DcatCatalogCheck(
            client_site_url=args.url,
            log_file=args.log_file,
            verbose=args.verbose,
            debug=args.debug,
        )
    
        if args.check_http_5xx:
            checker.check_options.http_error_5xx = True
    
        if args.check_format:
            checker.check_options.format_to_check = args.check_format
    
        if args.force_check_format:
            checker.check_options.force_check_format = args.force_check_format
    
        checker.no_recheck = args.no_recheck
    
        if args.results:
            checker.read_previous_results(args.results)
    
        if args.recheck:
            if not args.results:
                print("Missing results option for recheck.")
                sys.exit(1)
            checker.recheck_only()
        else:
            checker.read_dcat_catalog(args.url)