Skip to content
Snippets Groups Projects
Verified Commit a05d781e authored by Jesper Zedlitz's avatar Jesper Zedlitz
Browse files

Merge branch 'dev' of code.schleswig-holstein.de:opendata/dcat-catalog-check into dev

parents cf12f920 eb7799e7
No related branches found
No related tags found
1 merge request!1Update Formats, Dependencies, and Dockerfile Configuration
Pipeline #1399 passed
[run]
omit =
tests/*
\ No newline at end of file
......@@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Unit tests for URI replacements and resource clearing.
### Changed
- Updated coverage configuration to manage test file inclusion/exclusion.
- Test files are now omitted from coverage reports to focus on application code metrics.
## [1.0.0] - 2024-12-20
### Added
......
......@@ -154,7 +154,8 @@ class DcatCatalogCheck:
"error",
"etag",
"http_status",
"last_check" "mimetype",
"last_check",
"mimetype",
"mimetype_mismatch",
"valid",
]:
......@@ -174,7 +175,8 @@ class DcatCatalogCheck:
format = resource["format"].lower()
try:
# dynamically import the corresponding module for the format
format_check_module = importlib.import_module(f"formats.{format}_format")
format_check_module = importlib.import_module(
f"formats.{format}_format")
except ModuleNotFoundError:
format_check_module = None
......@@ -212,7 +214,8 @@ class DcatCatalogCheck:
# write the content of the HTTP response into a temporary file
original_file_name = url.split("/")[-1]
suffix = original_file_name.split(".")[-1] if "." in original_file_name else ""
suffix = original_file_name.split(
".")[-1] if "." in original_file_name else ""
with tempfile.NamedTemporaryFile(
delete=False, suffix="." + suffix
) as temp_file:
......@@ -235,7 +238,8 @@ class DcatCatalogCheck:
decompressor = decompressors.get(resource["mimetype"])
if not decompressor:
self.logger.warning(f"Unknown compression {resource['mimetype']}.")
self.logger.warning(
f"Unknown compression {resource['mimetype']}.")
else:
with tempfile.NamedTemporaryFile(delete=False) as decompressed_file:
with decompressor.open(temp_file.name, "rb") as compressed_file:
......@@ -246,7 +250,8 @@ class DcatCatalogCheck:
resource["mimetype"] = self._guess_mime_type(temp_file.name)
if self._is_container(resource["mimetype"], resource["format"]):
self._check_container_file(resource, temp_file, format_check_module)
self._check_container_file(
resource, temp_file, format_check_module)
else:
self._check_single_file(resource, temp_file, format_check_module)
......@@ -274,7 +279,8 @@ class DcatCatalogCheck:
temp_file.write(file.read())
temp_file.flush()
resource["mimetype"] = self._guess_mime_type(temp_file.name)
resource["mimetype"] = self._guess_mime_type(
temp_file.name)
validation_result = (
validation_result
and self._check_single_file(
......@@ -288,12 +294,14 @@ class DcatCatalogCheck:
return contains_at_least_one_relevant_file and validation_result
else:
self.logger.error(f"Unsupported container format {resource['mimetype']}")
self.logger.error(
f"Unsupported container format {resource['mimetype']}")
def _check_single_file(self, resource, temp_file, format_check_module):
if format_check_module:
# call the function `process` that is defined in every modul
resource["valid"] = format_check_module.is_valid(resource, temp_file)
resource["valid"] = format_check_module.is_valid(
resource, temp_file)
else:
# There is no specialized check for the specified format.
# Does the returned MIME type match the promised format?
......@@ -318,7 +326,8 @@ class DcatCatalogCheck:
):
hash_algorithm = hashlib.md5()
else:
print(f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
print(
f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
return
with open(temp_file.name, "rb") as f:
......@@ -413,7 +422,8 @@ class DcatCatalogCheck:
publisher = graph.value(dataset, DCTERMS.publisher)
if not publisher:
self.logger.warning(f"Publisher not found for dataset: {dataset}")
self.logger.warning(
f"Publisher not found for dataset: {dataset}")
return None
# Attempt to get the publisher's name
......@@ -427,7 +437,8 @@ class DcatCatalogCheck:
except Exception as e:
# Log any unexpected errors
self.logger.error(f"Error retrieving publisher for dataset {dataset}: {e}")
self.logger.error(
f"Error retrieving publisher for dataset {dataset}: {e}")
return None
def _process_datasets(self, datasets, g):
......@@ -452,7 +463,8 @@ class DcatCatalogCheck:
url = str(resource["url"])
if self._needs_check(url):
checksum_resource = g.value(distribution, SPDX.checksum)
checksum_resource = g.value(
distribution, SPDX.checksum)
if checksum_resource:
resource["checksum_algorithm"] = str(
g.value(checksum_resource, SPDX.algorithm)
......@@ -502,7 +514,8 @@ class DcatCatalogCheck:
loaded_count += 1
except json.JSONDecodeError as e:
self.logger.error(f"Invalid JSON at line {line_number}: {e}")
self.logger.error(
f"Invalid JSON at line {line_number}: {e}")
skipped_count += 1
self.logger.info(
......@@ -530,7 +543,8 @@ class DcatCatalogCheck:
self._process_datasets(datasets, g)
paged_collection = g.value(predicate=RDF.type, object=HYDRA.PagedCollection)
paged_collection = g.value(
predicate=RDF.type, object=HYDRA.PagedCollection)
next_page = g.value(paged_collection, HYDRA.nextPage)
url = str(next_page) if next_page else None
......@@ -555,9 +569,12 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--url", help="DCAT catalog URL")
parser.add_argument("--log_file", help="Log file path")
parser.add_argument("--results", help="File from which the results are loaded")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument(
"--results", help="File from which the results are loaded")
parser.add_argument("--verbose", action="store_true",
help="Enable verbose logging")
parser.add_argument("--debug", action="store_true",
help="Enable debug logging")
parser.add_argument(
"--recheck",
action="store_true",
......@@ -568,7 +585,8 @@ if __name__ == "__main__":
action="store_true",
help="Just check new entries from the catalog. Do not re-check existing results.",
)
parser.add_argument("--check-format", help="Only check the specified format")
parser.add_argument(
"--check-format", help="Only check the specified format")
parser.add_argument(
"--force-check-format",
help="Check distributinons with the specified format regardless of previous results",
......
......@@ -29,10 +29,13 @@ class TestDcatCatalogCheck(unittest.TestCase):
"XML": ["application/xml"],
}
self.assertTrue(self.dcc.is_mime_type_compatible("JSON", "application/json"))
self.assertFalse(self.dcc.is_mime_type_compatible("JSON", "application/xml"))
self.assertTrue(self.dcc.is_mime_type_compatible(
"JSON", "application/json"))
self.assertFalse(self.dcc.is_mime_type_compatible(
"JSON", "application/xml"))
self.assertFalse(
self.dcc.is_mime_type_compatible("UnknownFormat", "application/json")
self.dcc.is_mime_type_compatible(
"UnknownFormat", "application/json")
)
def test_read_allowed_file_formats(self):
......@@ -44,7 +47,8 @@ class TestDcatCatalogCheck(unittest.TestCase):
):
formats = self.dcc.read_allowed_file_formats()
self.assertEqual(
formats, {"JSON": ["application/json"], "XML": ["application/xml"]}
formats, {"JSON": ["application/json"],
"XML": ["application/xml"]}
)
def test_load_uri_replacements(self):
......@@ -55,7 +59,8 @@ class TestDcatCatalogCheck(unittest.TestCase):
),
):
replacements = self.dcc.load_uri_replacements()
self.assertEqual(replacements, [{"regex": "old", "replaced_by": "new"}])
self.assertEqual(
replacements, [{"regex": "old", "replaced_by": "new"}])
# Simulate that the file does not exist
......@@ -375,6 +380,82 @@ class TestDcatCatalogCheck(unittest.TestCase):
'Line 1 is missing \'url\': {"status": "valid", "format": "JSON"}'
)
def test_apply_uri_replacements(self):
"""Test the apply_uri_replacements method."""
# Setup URI replacements
self.dcc.uri_replacements = [
{"regex": r"example\.com", "replaced_by": "test.com"},
{"regex": r"http://", "replaced_by": "https://"},
]
# URL matching both replacements
url = "http://example.com/path"
result = self.dcc.apply_uri_replacements(url)
self.assertEqual(result, "https://test.com/path")
# URL matching only one replacement
url = "http://other.com/path"
result = self.dcc.apply_uri_replacements(url)
self.assertEqual(result, "https://other.com/path")
# URL with no matches
url = "https://unchanged.com/path"
result = self.dcc.apply_uri_replacements(url)
self.assertEqual(result, "https://unchanged.com/path")
# Empty URL
url = ""
result = self.dcc.apply_uri_replacements(url)
self.assertEqual(result, "")
# No URI replacements defined
self.dcc.uri_replacements = []
url = "http://example.com/path"
result = self.dcc.apply_uri_replacements(url)
self.assertEqual(result, "http://example.com/path")
def test_clear_result(self):
"""Test the _clear_result method."""
# Define a resource dictionary with keys to clear and some additional keys
resource = {
"accessible": True,
"checksum_ok": True,
"duration": 1.23,
"error": "Some error",
"etag": "some-etag",
"http_status": 200,
"last_check": "2024-12-27T12:34:56Z",
"mimetype": "application/json",
"mimetype_mismatch": False,
"valid": True,
"url": "http://example.com/data", # This key should remain untouched
"format": "JSON", # This key should remain untouched
}
# Call the _clear_result method
self.dcc._clear_result(resource)
# Check that all keys to clear have been removed
for key in [
"accessible",
"checksum_ok",
"duration",
"error",
"etag",
"http_status",
"last_check",
"mimetype",
"mimetype_mismatch",
"valid",
]:
self.assertNotIn(key, resource)
# Check that unrelated keys remain
self.assertIn("url", resource)
self.assertIn("format", resource)
self.assertEqual(resource["url"], "http://example.com/data")
self.assertEqual(resource["format"], "JSON")
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment