Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
"""A script that checks client sites for dead links.
Gets lists of links to be checked from a DCAT catalog, checks the links
to see if they're working or broken and writes the results in JSON Lines
text file format
"""
import argparse
import bz2
import gzip
import hashlib
import json
import logging
import lzma
import magic
import os
import re
import requests
import sys
import time
from zipfile import ZipFile
import importlib
import tempfile
from rdflib import Graph, Namespace
from rdflib.namespace import RDF, DCAT, DCTERMS, FOAF
from datetime import datetime
HYDRA = Namespace("http://www.w3.org/ns/hydra/core#")
SPDX = Namespace("http://spdx.org/rdf/terms#")
class CouldNotGetResourceIDsError(Exception):
"""Raised if getting the resource IDs to check fails."""
pass
class CouldNotGetURLError(Exception):
"""Raised if getting the URL for a given ID from the client site fails."""
pass
class CheckOptions:
format_to_check = None
force_check_format = None
http_error = None
http_error_5xx = False
def __init__(self):
pass
class DcatCatalogCheck:
check_options = CheckOptions()
no_recheck = False
def __init__(
self,
client_site_url,
log_file=None,
verbose=False,
debug=False,
):
self.client_site_url = client_site_url
self.log_file = log_file
self.verbose = verbose
self.debug = debug
self.allowed_file_formats = self.read_allowed_file_formats()
self.uri_replacements = self.load_uri_replacements()
self.logger = self._get_logger()
self.previous_results = {}
def _get_logger(self):
"""Configure and return a logger with optional file and verbosity settings.
:return: Configured logger.
"""
logger = logging.getLogger("dcatcatalogcheck")
level = (
logging.DEBUG
if self.debug
else logging.INFO
if self.verbose
else logging.WARNING
)
logger.setLevel(level)
ch = logging.StreamHandler()
ch.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
ch.setFormatter(formatter)
logger.addHandler(ch)
if self.log_file:
fh = logging.FileHandler(self.log_file)
fh.setLevel(level)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def is_mime_type_compatible(self, expected_format, actual_format):
"""Check if the actual MIME type is compatible with the expected format.
:param expected_format: The expected file format.
:param actual_format: The actual MIME type of the file.
:return: True if the MIME type is compatible, False otherwise.
"""
return actual_format in self.allowed_file_formats.get(
expected_format.upper(), []
)
def read_allowed_file_formats(self):
"""Read the allowed file formats from a JSON file.
:return: Dictionary of allowed file formats.
"""
with open("resources/file_types.json") as f:
return json.load(f)
def load_uri_replacements(self):
"""Load URI replacements from a JSON file if it exists."""
replacements_file = "uri_replacements.json"
if os.path.exists(replacements_file):
with open(replacements_file, "r") as f:
return json.load(f)
return []
def apply_uri_replacements(self, url):
"""Apply regex-based URI replacements to the given URL."""
for replacement in self.uri_replacements:
regex = replacement["regex"]
replaced_by = replacement["replaced_by"]
# Use regex substitution to replace URI parts in the URL
url = re.sub(regex, replaced_by, url)
return url
def load_http_complete(self, url):
"""Load a complete file from the specified URL and return the response object."""
return requests.get(url)
def _clear_result(self, resource):
"""Clear the result so we can start a fresh check of the distribution"""
for key in [
"accessible",
"checksum_ok",
"duration",
"error",
"etag",
"http_status",
"last_check",
"mimetype",
"mimetype_mismatch",
"valid",
]:
if key in resource:
del resource[key]
def check_resource(self, resource):
"""Check whether the given URL is dead or alive, with additional checks for file size and type."""
url = self.apply_uri_replacements(resource["url"])
self.logger.debug(f"Checking {url}...")
response = None
resource["last_check"] = str(datetime.now())
# try to find a specialized module based on the specified format specified in the metadata of the distribution
format = resource["format"].lower()
try:
# dynamically import the corresponding module for the format
format_check_module = importlib.import_module(f"formats.{format}_format")
except ModuleNotFoundError:
format_check_module = None
# download the file
try:
start_time = time.perf_counter()
response = self.load_http_complete(url)
end_time = time.perf_counter()
resource["http_status"] = response.status_code
resource["duration"] = end_time - start_time
response.raise_for_status() # Raise for 4xx/5xx status codes
resource["accessible"] = True
if "etag" in response.headers:
resource["etag"] = response.headers["etag"]
if "content-length" in response.headers:
resource["size"] = response.headers["content-length"]
except requests.exceptions.RequestException as err:
# Handle connection, timeout, or other request errors
resource["accessible"] = False
resource["error"] = str(err)
# no more checks possible
return
except AttributeError as err:
resource["accessible"] = False
resource["error"] = str(err)
# no more checks possible
return
# write the content of the HTTP response into a temporary file
original_file_name = url.split("/")[-1]
suffix = original_file_name.split(".")[-1] if "." in original_file_name else ""
with tempfile.NamedTemporaryFile(
delete=False, suffix="." + suffix
) as temp_file:
temp_file.write(response.content)
temp_file.flush()
resource["mimetype"] = self._guess_mime_type(temp_file.name)
if resource.get("checksum_algorithm") and resource.get("checksum_value"):
self._check_checksum(resource, temp_file)
# handle compressed files
if self._is_compressed(resource["mimetype"]):
resource["compress_format"] = resource["mimetype"]
decompressors = {
"application/gzip": gzip,
"application/x-bzip2": bz2,
"application/x-xz": lzma,
}
decompressor = decompressors.get(resource["mimetype"])
if not decompressor:
self.logger.warning(f"Unknown compression {resource['mimetype']}.")
else:
with tempfile.NamedTemporaryFile(delete=False) as decompressed_file:
with decompressor.open(temp_file.name, "rb") as compressed_file:
decompressed_file.write(compressed_file.read())
decompressed_file.flush()
os.remove(temp_file.name)
temp_file = decompressed_file
resource["mimetype"] = self._guess_mime_type(temp_file.name)
if self._is_container(resource["mimetype"], resource["format"]) and resource[
"format"
] not in ["GTFS", "GEOTIFF", "SHP"]:
self._check_container_file(resource, temp_file, format_check_module)
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
else:
self._check_single_file(resource, temp_file, format_check_module)
# clean up
os.remove(temp_file.name)
def _check_container_file(self, resource, temp_file, format_check_module):
if "application/zip" == resource["mimetype"]:
resource["package_format"] = "application/zip"
validation_result = True
contains_at_least_one_relevant_file = False
filename_suffix = "." + resource["format"].lower()
with ZipFile(temp_file.name) as zip_file:
file_list = zip_file.namelist()
relevant_files = [
file for file in file_list if file.endswith(filename_suffix)
]
contains_at_least_one_relevant_file = len(relevant_files) > 0
for file_name in relevant_files:
with zip_file.open(file_name) as file:
with tempfile.NamedTemporaryFile(delete=True) as temp_file:
temp_file.write(file.read())
temp_file.flush()
resource["mimetype"] = self._guess_mime_type(temp_file.name)
validation_result = (
validation_result
and self._check_single_file(
resource, temp_file, format_check_module
)
)
if not contains_at_least_one_relevant_file:
resource["mimetype_mismatch"] = True
return contains_at_least_one_relevant_file and validation_result
else:
self.logger.error(f"Unsupported container format {resource['mimetype']}")
def _check_single_file(self, resource, temp_file, format_check_module):
if format_check_module:
# call the function `process` that is defined in every modul
resource["valid"] = format_check_module.is_valid(resource, temp_file)
else:
# There is no specialized check for the specified format.
# Does the returned MIME type match the promised format?
if not self.is_mime_type_compatible(
resource["format"], resource["mimetype"]
):
resource["mimetype_mismatch"] = True
self.logger.warning(
f"MIME type mismatch: expected {resource['format']}, but got {resource['mimetype']}"
)
def _check_checksum(self, resource, temp_file):
algo_name = resource["checksum_algorithm"]
if algo_name == "http://spdx.org/rdf/terms#checksumAlgorithm_sha1":
hash_algorithm = hashlib.sha1()
elif algo_name == "http://spdx.org/rdf/terms#checksumAlgorithm_sha256":
hash_algorithm = hashlib.sha256()
elif (
algo_name == "http://spdx.org/rdf/terms#checksumAlgorithm_md5"
or algo_name == "http://dcat-ap.de/def/hashAlgorithms/md/5"
):
hash_algorithm = hashlib.md5()
else:
print(f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
return
with open(temp_file.name, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_algorithm.update(chunk)
checksum = hash_algorithm.hexdigest()
resource["checksum_ok"] = checksum == resource["checksum_value"]
def _guess_mime_type(self, file_name):
"""Guess the MIME type of a file using `magic` library."""
mime = magic.Magic(mime=True)
return mime.from_file(file_name)
def _is_compressed(self, mime_type):
"""Does the specified MIME type indicate a compressed file?"""
return mime_type in [
"application/gzip",
"application/x-bzip2",
"application/x-xz",
]
def _is_container(self, mime_type, expected_format):
"""Does the specified MIME type indicate a container file that
contains other files. Examples for container files are ZIP or TAR."""
if mime_type == "application/x-tar":
return True
if mime_type != "application/zip":
return False
# ZIP files are difficult. There are formats file SHP and GTFS
# files that are ZIP files. However, these formats will be
# checked differently. Therefor, we have to take a look a the
# expected format and check if this format is based on ZIP.
return "application/zip" not in self.allowed_file_formats.get(
expected_format.upper(), []
)
def _needs_check(self, url):
"""Check if the specified URL has to be check or if it has been inspected recently."""
if url not in self.previous_results:
return True
elif self.no_recheck:
# The user requested not to re-check any distributions.
# Therefore, a (re-)check of this distribution is NOT necessary.
return False
else:
previous_result = self.previous_results[url]
if self.check_options and self.check_options.force_check_format:
# the user requested to look only at the format of the distribution
return (
previous_result.get("format", "").lower()
== self.check_options.force_check_format.lower()
)
if self.check_options and self.check_options.http_error_5xx:
# the user requested only checks of HTTP 5xx error
last_http_status = previous_result.get("http_status", 0)
recheck_necessary = last_http_status >= 500 and last_http_status <= 599
else:
recheck_necessary = (
not previous_result.get("accessible", False)
or previous_result.get("mimetype_mismatch", False)
or not previous_result.get("valid", True)
or not previous_result.get("checksum_ok", True)
or not previous_result.get("schema_valid", True)
)
if recheck_necessary and self.check_options:
# Did the user specify a limit set of checks?
if self.check_options.format_to_check:
recheck_necessary = (
previous_result.get("format", "").lower()
== self.check_options.format_to_check.lower()
)
if not recheck_necessary:
self.logger.debug(
f"Skipping {url}. It has been checked at {previous_result.get('last_check')}."
)
else:
self.logger.debug(
f"Rechecking {url}. Last check was at {previous_result.get('last_check')}."
)
return recheck_necessary
def _get_publisher(self, graph, dataset):
try:
# Attempt to get the publisher URI
publisher = graph.value(dataset, DCTERMS.publisher)
if not publisher:
self.logger.warning(f"Publisher not found for dataset: {dataset}")
return None
# Attempt to get the publisher's name
publisher_name = graph.value(publisher, FOAF.name)
if publisher_name:
return str(publisher_name)
else:
# If the name is not available, fallback to using the publisher URI
return str(publisher)
except Exception as e:
# Log any unexpected errors
self.logger.error(f"Error retrieving publisher for dataset {dataset}: {e}")
return None
def _process_datasets(self, datasets, g):
for dataset in datasets:
publisher = self._get_publisher(g, dataset)
# find the distributions for each dataset
for distribution in g.objects(dataset, DCAT.distribution):
if (distribution, RDF.type, DCAT.Distribution) in g:
resource = {}
format_url = g.value(distribution, DCTERMS.format)
resource["id"] = str(distribution)
resource["dataset"] = str(dataset)
resource["format"] = (
format_url.split("/")[-1] if format_url else None
)
resource["publisher"] = publisher
resource["url"] = g.value(distribution, DCAT.downloadURL)
if not resource["url"]:
# if the distribution does not have a downloadURL try the accessURL
resource["url"] = g.value(distribution, DCAT.accessURL)
url = str(resource["url"])
if self._needs_check(url):
checksum_resource = g.value(distribution, SPDX.checksum)
if checksum_resource:
resource["checksum_algorithm"] = str(
g.value(checksum_resource, SPDX.algorithm)
)
resource["checksum_value"] = str(
g.value(checksum_resource, SPDX.checksumValue)
)
self.check_resource(resource)
self._report_result(resource)
else:
previous_result = self.previous_results[url]
self._report_result(previous_result)
def _report_result(self, resource):
print(json.dumps(resource), flush=True)
def read_previous_results(self, file_path):
if not os.path.exists(file_path):
self.logger.warning(
f"File '{file_path}' does not exist. No previous results loaded."
)
return
loaded_count = 0
skipped_count = 0
with open(file_path, "r", encoding="utf-8") as file:
for line_number, line in enumerate(file, start=1):
line = line.strip()
if not line:
self.logger.debug(f"Skipped empty line {line_number}.")
skipped_count += 1
continue
try:
json_object = json.loads(line)
url = json_object.get("url")
if not url:
self.logger.warning(
f"Line {line_number} is missing 'url': {line}"
)
skipped_count += 1
continue
self.previous_results[url] = json_object
loaded_count += 1
except json.JSONDecodeError as e:
self.logger.error(f"Invalid JSON at line {line_number}: {e}")
f"Loaded {loaded_count} results from '{file_path}', skipped {skipped_count} lines."
)
def read_dcat_catalog(self, url):
while url:
self.logger.info(f"Loading metadata from {url} ...")
response = requests.get(url)
response.raise_for_status() # check if the HTTP request was successful
# parse the retrieved data into an RDF graph object
g = Graph()
if response.headers["content-type"] == "application/rdf+xml":
g.parse(data=response.text, format="xml")
else:
g.parse(data=response.text)
# find all dcat:Dataset entries
datasets = []
for s in g.subjects(predicate=RDF.type, object=DCAT.Dataset):
datasets.append(s)
self._process_datasets(datasets, g)
paged_collection = g.value(predicate=RDF.type, object=HYDRA.PagedCollection)
next_page = g.value(paged_collection, HYDRA.nextPage)
url = str(next_page) if next_page else None
def recheck_only(self):
for url in self.previous_results:
resource = self.previous_results[url]
if self._needs_check(url):
self.logger.info(f"{url} needs a re-check")
self._clear_result(resource)
self.check_resource(resource)
self._report_result(resource)
else:
self._report_result(resource)
if __name__ == "__main__":
"""Parse command-line arguments and run the link checker.
:param args: List of command-line arguments. If None, uses sys.argv.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--url", help="DCAT catalog URL")
parser.add_argument("--log_file", help="Log file path")
parser.add_argument("--results", help="File from which the results are loaded")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument(
"--recheck",
action="store_true",
help="Only use the previous results (specified by --results) as input for the next check",
)
parser.add_argument(
"--no-recheck",
action="store_true",
help="Just check new entries from the catalog. Do not re-check existing results.",
)
parser.add_argument("--check-format", help="Only check the specified format")
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
parser.add_argument(
"--force-check-format",
help="Check distributinons with the specified format regardless of previous results",
)
parser.add_argument(
"--check-http-5xx", action="store_true", help="Only re-check HTTP 5xx errors"
)
args = parser.parse_args()
checker = DcatCatalogCheck(
client_site_url=args.url,
log_file=args.log_file,
verbose=args.verbose,
debug=args.debug,
)
if args.check_http_5xx:
checker.check_options.http_error_5xx = True
if args.check_format:
checker.check_options.format_to_check = args.check_format
if args.force_check_format:
checker.check_options.force_check_format = args.force_check_format
checker.no_recheck = args.no_recheck
if args.results:
checker.read_previous_results(args.results)
if args.recheck:
if not args.results:
print("Missing results option for recheck.")
sys.exit(1)
checker.recheck_only()
else:
checker.read_dcat_catalog(args.url)