diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000000000000000000000000000000000000..49addf786e2886ced2e5889f3367aedfdae7b4de --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +omit = + tests/* \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 32cf7cc6cd30617047dbc698a36744c6b3173a25..a4915846c97ef11b5d55c0b6c5a3b121b10c1885 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Unit tests for URI replacements and resource clearing. + +### Changed + +- Updated coverage configuration to manage test file inclusion/exclusion. + - Test files are now omitted from coverage reports to focus on application code metrics. + ## [1.0.0] - 2024-12-20 ### Added diff --git a/dcat_catalog_check.py b/dcat_catalog_check.py index 071e9ba0bab17c6f6bffc6f23ebdafc611212d1e..ea0d3d1bbd68452652f95b231ec47491d0ea432b 100755 --- a/dcat_catalog_check.py +++ b/dcat_catalog_check.py @@ -154,7 +154,8 @@ class DcatCatalogCheck: "error", "etag", "http_status", - "last_check" "mimetype", + "last_check", + "mimetype", "mimetype_mismatch", "valid", ]: @@ -174,7 +175,8 @@ class DcatCatalogCheck: format = resource["format"].lower() try: # dynamically import the corresponding module for the format - format_check_module = importlib.import_module(f"formats.{format}_format") + format_check_module = importlib.import_module( + f"formats.{format}_format") except ModuleNotFoundError: format_check_module = None @@ -212,7 +214,8 @@ class DcatCatalogCheck: # write the content of the HTTP response into a temporary file original_file_name = url.split("/")[-1] - suffix = original_file_name.split(".")[-1] if "." in original_file_name else "" + suffix = original_file_name.split( + ".")[-1] if "." in original_file_name else "" with tempfile.NamedTemporaryFile( delete=False, suffix="." + suffix ) as temp_file: @@ -235,7 +238,8 @@ class DcatCatalogCheck: decompressor = decompressors.get(resource["mimetype"]) if not decompressor: - self.logger.warning(f"Unknown compression {resource['mimetype']}.") + self.logger.warning( + f"Unknown compression {resource['mimetype']}.") else: with tempfile.NamedTemporaryFile(delete=False) as decompressed_file: with decompressor.open(temp_file.name, "rb") as compressed_file: @@ -246,7 +250,8 @@ class DcatCatalogCheck: resource["mimetype"] = self._guess_mime_type(temp_file.name) if self._is_container(resource["mimetype"], resource["format"]): - self._check_container_file(resource, temp_file, format_check_module) + self._check_container_file( + resource, temp_file, format_check_module) else: self._check_single_file(resource, temp_file, format_check_module) @@ -274,7 +279,8 @@ class DcatCatalogCheck: temp_file.write(file.read()) temp_file.flush() - resource["mimetype"] = self._guess_mime_type(temp_file.name) + resource["mimetype"] = self._guess_mime_type( + temp_file.name) validation_result = ( validation_result and self._check_single_file( @@ -288,12 +294,14 @@ class DcatCatalogCheck: return contains_at_least_one_relevant_file and validation_result else: - self.logger.error(f"Unsupported container format {resource['mimetype']}") + self.logger.error( + f"Unsupported container format {resource['mimetype']}") def _check_single_file(self, resource, temp_file, format_check_module): if format_check_module: # call the function `process` that is defined in every modul - resource["valid"] = format_check_module.is_valid(resource, temp_file) + resource["valid"] = format_check_module.is_valid( + resource, temp_file) else: # There is no specialized check for the specified format. # Does the returned MIME type match the promised format? @@ -318,7 +326,8 @@ class DcatCatalogCheck: ): hash_algorithm = hashlib.md5() else: - print(f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr) + print( + f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr) return with open(temp_file.name, "rb") as f: @@ -413,7 +422,8 @@ class DcatCatalogCheck: publisher = graph.value(dataset, DCTERMS.publisher) if not publisher: - self.logger.warning(f"Publisher not found for dataset: {dataset}") + self.logger.warning( + f"Publisher not found for dataset: {dataset}") return None # Attempt to get the publisher's name @@ -427,7 +437,8 @@ class DcatCatalogCheck: except Exception as e: # Log any unexpected errors - self.logger.error(f"Error retrieving publisher for dataset {dataset}: {e}") + self.logger.error( + f"Error retrieving publisher for dataset {dataset}: {e}") return None def _process_datasets(self, datasets, g): @@ -452,7 +463,8 @@ class DcatCatalogCheck: url = str(resource["url"]) if self._needs_check(url): - checksum_resource = g.value(distribution, SPDX.checksum) + checksum_resource = g.value( + distribution, SPDX.checksum) if checksum_resource: resource["checksum_algorithm"] = str( g.value(checksum_resource, SPDX.algorithm) @@ -502,7 +514,8 @@ class DcatCatalogCheck: loaded_count += 1 except json.JSONDecodeError as e: - self.logger.error(f"Invalid JSON at line {line_number}: {e}") + self.logger.error( + f"Invalid JSON at line {line_number}: {e}") skipped_count += 1 self.logger.info( @@ -530,7 +543,8 @@ class DcatCatalogCheck: self._process_datasets(datasets, g) - paged_collection = g.value(predicate=RDF.type, object=HYDRA.PagedCollection) + paged_collection = g.value( + predicate=RDF.type, object=HYDRA.PagedCollection) next_page = g.value(paged_collection, HYDRA.nextPage) url = str(next_page) if next_page else None @@ -555,9 +569,12 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--url", help="DCAT catalog URL") parser.add_argument("--log_file", help="Log file path") - parser.add_argument("--results", help="File from which the results are loaded") - parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") - parser.add_argument("--debug", action="store_true", help="Enable debug logging") + parser.add_argument( + "--results", help="File from which the results are loaded") + parser.add_argument("--verbose", action="store_true", + help="Enable verbose logging") + parser.add_argument("--debug", action="store_true", + help="Enable debug logging") parser.add_argument( "--recheck", action="store_true", @@ -568,7 +585,8 @@ if __name__ == "__main__": action="store_true", help="Just check new entries from the catalog. Do not re-check existing results.", ) - parser.add_argument("--check-format", help="Only check the specified format") + parser.add_argument( + "--check-format", help="Only check the specified format") parser.add_argument( "--force-check-format", help="Check distributinons with the specified format regardless of previous results", diff --git a/tests/test_format_fidelity_checker.py b/tests/test_format_fidelity_checker.py index f5798a751b3f808694ef6f8bb1f63736aaaaa775..3c67747b866e03c2a0752af5fdfbb6767c5bbec0 100644 --- a/tests/test_format_fidelity_checker.py +++ b/tests/test_format_fidelity_checker.py @@ -29,10 +29,13 @@ class TestDcatCatalogCheck(unittest.TestCase): "XML": ["application/xml"], } - self.assertTrue(self.dcc.is_mime_type_compatible("JSON", "application/json")) - self.assertFalse(self.dcc.is_mime_type_compatible("JSON", "application/xml")) + self.assertTrue(self.dcc.is_mime_type_compatible( + "JSON", "application/json")) + self.assertFalse(self.dcc.is_mime_type_compatible( + "JSON", "application/xml")) self.assertFalse( - self.dcc.is_mime_type_compatible("UnknownFormat", "application/json") + self.dcc.is_mime_type_compatible( + "UnknownFormat", "application/json") ) def test_read_allowed_file_formats(self): @@ -44,7 +47,8 @@ class TestDcatCatalogCheck(unittest.TestCase): ): formats = self.dcc.read_allowed_file_formats() self.assertEqual( - formats, {"JSON": ["application/json"], "XML": ["application/xml"]} + formats, {"JSON": ["application/json"], + "XML": ["application/xml"]} ) def test_load_uri_replacements(self): @@ -55,7 +59,8 @@ class TestDcatCatalogCheck(unittest.TestCase): ), ): replacements = self.dcc.load_uri_replacements() - self.assertEqual(replacements, [{"regex": "old", "replaced_by": "new"}]) + self.assertEqual( + replacements, [{"regex": "old", "replaced_by": "new"}]) # Simulate that the file does not exist @@ -375,6 +380,82 @@ class TestDcatCatalogCheck(unittest.TestCase): 'Line 1 is missing \'url\': {"status": "valid", "format": "JSON"}' ) + def test_apply_uri_replacements(self): + """Test the apply_uri_replacements method.""" + # Setup URI replacements + self.dcc.uri_replacements = [ + {"regex": r"example\.com", "replaced_by": "test.com"}, + {"regex": r"http://", "replaced_by": "https://"}, + ] + + # URL matching both replacements + url = "http://example.com/path" + result = self.dcc.apply_uri_replacements(url) + self.assertEqual(result, "https://test.com/path") + + # URL matching only one replacement + url = "http://other.com/path" + result = self.dcc.apply_uri_replacements(url) + self.assertEqual(result, "https://other.com/path") + + # URL with no matches + url = "https://unchanged.com/path" + result = self.dcc.apply_uri_replacements(url) + self.assertEqual(result, "https://unchanged.com/path") + + # Empty URL + url = "" + result = self.dcc.apply_uri_replacements(url) + self.assertEqual(result, "") + + # No URI replacements defined + self.dcc.uri_replacements = [] + url = "http://example.com/path" + result = self.dcc.apply_uri_replacements(url) + self.assertEqual(result, "http://example.com/path") + + def test_clear_result(self): + """Test the _clear_result method.""" + # Define a resource dictionary with keys to clear and some additional keys + resource = { + "accessible": True, + "checksum_ok": True, + "duration": 1.23, + "error": "Some error", + "etag": "some-etag", + "http_status": 200, + "last_check": "2024-12-27T12:34:56Z", + "mimetype": "application/json", + "mimetype_mismatch": False, + "valid": True, + "url": "http://example.com/data", # This key should remain untouched + "format": "JSON", # This key should remain untouched + } + + # Call the _clear_result method + self.dcc._clear_result(resource) + + # Check that all keys to clear have been removed + for key in [ + "accessible", + "checksum_ok", + "duration", + "error", + "etag", + "http_status", + "last_check", + "mimetype", + "mimetype_mismatch", + "valid", + ]: + self.assertNotIn(key, resource) + + # Check that unrelated keys remain + self.assertIn("url", resource) + self.assertIn("format", resource) + self.assertEqual(resource["url"], "http://example.com/data") + self.assertEqual(resource["format"], "JSON") + if __name__ == "__main__": unittest.main()