Skip to content
Snippets Groups Projects
Verified Commit b072c066 authored by Jesper Zedlitz's avatar Jesper Zedlitz
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
Pipeline #1394 passed
File added
{
"name": "ufo",
"type": "table",
"path": "http://localhost:8000/ufo.csv",
"format": "csv",
"mediatype": "text/csv",
"encoding": "utf-8",
"profile": "tabular-data-resource",
"schema": {
"fields": [
{
"name": "datum",
"type": "date",
"title": "Datum"
},
{
"name": "ufo_landungen",
"type": "integer",
"title": "Anzahl UFO-Landungen"
},
{
"name": "ufo_starts",
"type": "integer",
"title": "Anzahl UFO-Starts"
}
]
}
}
{
"fields" : [
{
"name" : "datum",
"title" : "Datum",
"type" : "date"
},
{
"name" : "ufo_landungen",
"title" : "Anzahl UFO-Landungen",
"type" : "integer"
},
{
"name" : "ufo_starts",
"title" : "Anzahl UFO-Starts",
"type" : "integer"
}
]
}
datum,ufo_landungen,ufo_starts
2024-06-10,0,0
2024-06-11,0,0
2024-06-12,0,0
2024-06-13,0,0
2024-06-14,0,0
2024-06-15,0,0
2024-06-16,0,0
2024-06-17,0,0
@prefix dcat: <http://www.w3.org/ns/dcat#> .
@prefix dcatde: <http://dcat-ap.de/def/dcatde/> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix spdx: <http://spdx.org/rdf/terms#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
<https://example.org/dataset/87e42608-769f-4ca8-8593-7546a027b2b8> a dcat:Dataset ;
dcterms:accessRights <http://publications.europa.eu/resource/authority/access-right/PUBLIC> ;
dcterms:description "Anzahl täglicher Landungen und Starts unbekannter Flugobjekte (UFOs) in Schleswig-Holstein. 🛸👽\n##Methodik\nGezählt werden nur die Landungen und Starts von UFOs, die gemeldet und zusätzlich offiziell bestätigt wurden. Sichtungen, die zu keinem Bodenkontakt führen, werden nicht gezählt.\n##Attribute\n- `datum` - Datum\n- `ufo_landungen` - Anzahl UFO-Landungen\n- `ufo_starts` - Anzahl UFO-Starts\n" ;
dcterms:identifier "87e42608-769f-4ca8-8593-7546a027b2b8" ;
dcterms:issued "2024-06-18T07:20:05.693344"^^xsd:dateTime ;
dcterms:modified "2024-06-18T07:20:05.693344"^^xsd:dateTime ;
dcterms:license <http://dcat-ap.de/def/licenses/cc-zero> ;
dcterms:publisher <https://example.org/organization/ufo-kontrolle> ;
dcterms:spatial <http://dcat-ap.de/def/politicalGeocoding/stateKey/01> ;
dcterms:temporal [ a dcterms:PeriodOfTime ;
dcat:endDate "2024-06-17"^^xsd:date ;
dcat:startDate "2024-06-10"^^xsd:date ] ;
dcterms:title "Bestätigte UFO-Landungen und -Starts" ;
dcat:distribution [
a dcat:Distribution ;
dcterms:format <http://publications.europa.eu/resource/authority/file-type/CSV> ;
dcterms:issued "2024-06-18T05:20:07.232559"^^xsd:dateTime ;
dcterms:license <http://dcat-ap.de/def/licenses/cc-zero> ;
dcterms:modified "2024-06-18T05:20:07.191976"^^xsd:dateTime ;
dcterms:rights <http://dcat-ap.de/def/licenses/cc-zero> ;
dcterms:title "ufo.csv" ;
spdx:checksum [ a spdx:Checksum ;
spdx:algorithm spdx:checksumAlgorithm_sha1 ;
spdx:checksumValue "3ffba0a43d3497a7918b376a335c31fbecc9325b"^^xsd:hexBinary ] ;
dcat:accessURL <http://localhost:8000/ufo.csv> ;
dcat:byteSize 151 ;
dcat:downloadURL <http://localhost:8000/ufo.csv> ;
dcat:mediaType <https://www.iana.org/assignments/media-types/application/csv>
] ;
dcat:distribution [
a dcat:Distribution ;
dcterms:format <http://publications.europa.eu/resource/authority/file-type/JSON> ;
dcterms:issued "2024-06-18T05:20:07.232559"^^xsd:dateTime ;
dcterms:license <http://dcat-ap.de/def/licenses/cc-zero> ;
dcterms:modified "2024-06-18T05:20:07.191976"^^xsd:dateTime ;
dcterms:rights <http://dcat-ap.de/def/licenses/cc-zero> ;
dcterms:title "Frictionless Data Resource" ;
spdx:checksum [ a spdx:Checksum ;
spdx:algorithm spdx:checksumAlgorithm_md5 ;
spdx:checksumValue "8dca8b179bbe0d46c5004da5112f6c4c"^^xsd:hexBinary ] ;
dcat:accessURL <http://localhost:8000/ufo-resource.json> ;
dcat:byteSize 487 ;
dcat:downloadURL <http://localhost:8000/ufo-resource.json> ;
dcat:mediaType <https://www.iana.org/assignments/media-types/application/csv>
] ;
dcat:keyword "UFO", "Landung", "Start", "Raumschiff", "Weltall", "Testdaten" ;
dcat:theme <http://publications.europa.eu/resource/authority/data-theme/INTL> .
File added
File added
import os
import unittest
import json
from unittest.mock import patch, mock_open, MagicMock
from dcat_catalog_check import (
DcatCatalogCheck,
)
from rdflib import Graph
from rdflib.namespace import RDF, DCAT
class TestDcatCatalogCheck(unittest.TestCase):
def setUp(self):
self.dcc = DcatCatalogCheck(
"http://localhost:8000/", "my_api_key")
# Mock the logger to capture log messages
self.logger_patch = patch.object(self.dcc, 'logger', MagicMock())
self.mock_logger = self.logger_patch.start()
def tearDown(self):
if os.path.exists("my_api_key"):
os.remove("my_api_key")
if os.path.exists("previous_results.json"):
os.remove("previous_results.json")
self.logger_patch.stop()
def test_is_mime_type_compatible(self):
self.dcc.allowed_file_formats = {
"JSON": ["application/json"],
"XML": ["application/xml"],
}
self.assertTrue(self.dcc.is_mime_type_compatible(
"JSON", "application/json"))
self.assertFalse(self.dcc.is_mime_type_compatible(
"JSON", "application/xml"))
self.assertFalse(
self.dcc.is_mime_type_compatible(
"UnknownFormat", "application/json")
)
def test_read_allowed_file_formats(self):
with patch(
"builtins.open",
unittest.mock.mock_open(
read_data='{"JSON": ["application/json"], "XML": ["application/xml"]}'
),
):
formats = self.dcc.read_allowed_file_formats()
self.assertEqual(
formats, {"JSON": ["application/json"],
"XML": ["application/xml"]}
)
def test_load_uri_replacements(self):
with patch("os.path.exists", return_value=True), patch(
"builtins.open",
unittest.mock.mock_open(
read_data='[{"regex": "old", "replaced_by": "new"}]'
),
):
replacements = self.dcc.load_uri_replacements()
self.assertEqual(
replacements, [{"regex": "old", "replaced_by": "new"}])
# Simulate that the file does not exist
@patch("os.path.exists", return_value=False)
def test_load_uri_replacements_file_not_exist(self, mock_exists):
# Call the method to test
replacements = self.dcc.load_uri_replacements()
# Assert that it returns an empty list
self.assertEqual(replacements, [])
@patch("dcat_catalog_check.requests.get")
def test_load_http_complete(self, mock_get):
mock_response = MagicMock()
mock_response.content = b"content"
mock_get.return_value = mock_response
response = self.dcc.load_http_complete("http://example.com")
self.assertEqual(response.content, b"content")
def test_get_publisher(self):
g = Graph()
g.parse(
data='@prefix dcat: <http://www.w3.org/ns/dcat#> .\n@prefix dct: <http://purl.org/dc/terms/> .\n@prefix foaf: <http://xmlns.com/foaf/0.1/> .\n<http://example.org/DS> a dcat:Dataset; dct:publisher [ a foaf:Organization ; foaf:name "The publisher" ] .',
format="ttl",
)
for dataset in g.subjects(predicate=RDF.type, object=DCAT.Dataset):
result = self.dcc._get_publisher(g, dataset)
self.assertEqual("The publisher", result)
def test_get_publisher_url(self):
g = Graph()
g.parse(
data="@prefix dcat: <http://www.w3.org/ns/dcat#> .\n@prefix dct: <http://purl.org/dc/terms/> .\n<http://example.org/DS> a dcat:Dataset; dct:publisher <http://example.org/publisher> .",
format="ttl",
)
for dataset in g.subjects(predicate=RDF.type, object=DCAT.Dataset):
result = self.dcc._get_publisher(g, dataset)
self.assertEqual("http://example.org/publisher", result)
def test_check_resource__json_valid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/correct.json", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], True)
self.assertEqual(resource["http_status"], 200)
self.assertEqual(resource["mimetype"], "application/json")
def test_check_resource__json_gz_valid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/correct.json.gz", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], True)
self.assertEqual(resource["http_status"], 200)
self.assertEqual(resource["mimetype"], "application/json")
self.assertEqual(resource["compress_format"], "application/gzip")
def test_check_resource__json_bz2_valid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/correct.json.bz2", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], True)
self.assertEqual(resource["http_status"], 200)
self.assertEqual(resource["mimetype"], "application/json")
self.assertEqual(resource["compress_format"], "application/x-bzip2")
def test_check_resource__json_xz_valid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/correct.json.xz", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], True)
self.assertEqual(resource["http_status"], 200)
self.assertEqual(resource["mimetype"], "application/json")
self.assertEqual(resource["compress_format"], "application/x-xz")
def test_check_resource__json_invalid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/incorrect.json", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], False)
self.assertEqual(resource["http_status"], 200)
def test_check_resource__xml_valid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/correct.xml", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "XML"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], True)
self.assertEqual(resource["http_status"], 200)
def test_check_resource__png_valid(self):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/image.png", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "PNG"
resource["checksum_algorithm"] = (
"http://spdx.org/rdf/terms#checksumAlgorithm_sha1"
)
resource["checksum_value"] = "a8643241029f9779302874db5c18b0f0bacbdd25"
self.dcc.check_resource(resource)
self.assertEqual(resource["accessible"], True)
self.assertEqual(resource["valid"], True)
self.assertEqual(resource["http_status"], 200)
self.assertEqual(resource["mimetype"], "image/png")
self.assertEqual(resource["checksum_ok"], True)
def test_check_checksum(self):
"""The checksum check also works with the old DCAT-AT.de algorithm specifications"""
resource = {}
resource["checksum_algorithm"] = "http://dcat-ap.de/def/hashAlgorithms/md/5"
resource["checksum_value"] = "7e2fb748950d6d07ab3f75ac87f6f5da"
with open("tests/data/image.png", "rb") as file:
self.dcc._check_checksum(resource, file)
self.assertEqual(resource["checksum_ok"], True)
def test_check_resource__one_json_in_zip_valid(self):
"""This ZIP file contains just one valid JSON file."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/json-in-zip.zip", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource.get("accessible"), True)
self.assertEqual(resource.get("valid"), True)
self.assertEqual(resource.get("http_status"), 200)
self.assertEqual(resource.get("mimetype"), "application/json")
self.assertEqual(resource.get("package_format"), "application/zip")
def test_check_resource__multiple_json_files_in_zip_valid(self):
"""This ZIP file contains several valid JSON files and one image."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/jsons-in-zip.zip", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource.get("accessible"), True)
self.assertEqual(resource.get("valid"), True)
self.assertEqual(resource.get("http_status"), 200)
self.assertEqual(resource.get("mimetype"), "application/json")
self.assertEqual(resource.get("package_format"), "application/zip")
def test_check_resource__no_json_in_zip_valid(self):
"""This ZIP file does not contain any JSON file only other files."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/png-in-zip.zip", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/data"
resource["format"] = "JSON"
self.dcc.check_resource(resource)
self.assertEqual(resource.get("accessible"), True)
self.assertEqual(resource.get("http_status"), 200)
self.assertEqual(resource.get("mimetype_mismatch"), True)
self.assertEqual(resource.get("package_format"), "application/zip")
def test_is_container(self):
self.dcc.read_allowed_file_formats()
self.assertFalse(self.dcc._is_container("image/png", "PNG"))
self.assertTrue(self.dcc._is_container("application/x-tar", "PNG"))
self.assertTrue(self.dcc._is_container("application/zip", "PNG"))
self.assertFalse(self.dcc._is_container("application/zip", "SHP"))
self.assertFalse(self.dcc._is_container("application/zip", "GTFS"))
self.assertFalse(self.dcc._is_container("application/zip", "ZIP"))
def test_check_resource__shp_with_multiple_layers(self):
"""This shape file contains multiple layers."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.reason = "OK"
with open("tests/data/zos116.zip", "rb") as file:
mock_response.content = file.read()
self.dcc.load_http_complete = MagicMock(return_value=mock_response)
resource = {}
resource["url"] = "http://localhost/zos116.zip"
resource["format"] = "SHP"
self.dcc.check_resource(resource)
self.assertIsNone(resource.get("error"))
self.assertEqual(resource.get("accessible"), True)
self.assertEqual(resource.get("http_status"), 200)
self.assertEqual(resource.get("valid"), True)
def test_read_previous_results(self):
# Test data to simulate the contents of previous_results.json
test_data = [
{"url": "http://example.com", "status": "valid", "format": "JSON"},
{"url": "http://example.org", "status": "invalid", "format": "XML"}
]
# Write test data to a file 'previous_results.json'
with open("previous_results.json", "w", encoding="utf-8") as f:
for entry in test_data:
f.write(json.dumps(entry) + "\n")
# Call the method to test
self.dcc.read_previous_results("previous_results.json")
# Assertions: Check if the data was loaded correctly into previous_results
self.assertEqual(len(self.dcc.previous_results), 2) # Expect 2 entries
self.assertIn("http://example.com", self.dcc.previous_results)
self.assertIn("http://example.org", self.dcc.previous_results)
self.assertEqual(
self.dcc.previous_results["http://example.com"]["status"], "valid")
self.assertEqual(
self.dcc.previous_results["http://example.org"]["status"], "invalid")
@patch("os.path.exists", return_value=False)
def test_read_previous_results_file_not_exist(self, mock_exists):
"""Test when the file does not exist."""
self.dcc.read_previous_results("non_existent_file.json")
# Check that the warning log was triggered
self.mock_logger.warning.assert_called_with(
"File 'non_existent_file.json' does not exist. No previous results loaded."
)
@patch("builtins.open", mock_open(read_data="invalid_json"))
@patch("os.path.exists", return_value=True)
def test_read_previous_results_invalid_json(self, mock_exists):
"""Test when the file contains invalid JSON."""
self.dcc.read_previous_results("invalid_json_file.json")
# Check if the error log was triggered for invalid JSON
self.mock_logger.error.assert_called_with(
"Invalid JSON at line 1: Expecting value: line 1 column 1 (char 0)"
)
@patch("builtins.open", mock_open(read_data='{"status": "valid", "format": "JSON"}\n{"url": "http://example.com", "status": "valid", "format": "JSON"}'))
@patch("os.path.exists", return_value=True)
def test_read_previous_results_missing_url(self, mock_exists):
"""Test when the file has a line with missing 'url'."""
self.dcc.read_previous_results("missing_url_file.json")
# Check if the warning log was triggered for the missing 'url'
self.mock_logger.warning.assert_called_with(
'Line 1 is missing \'url\': {"status": "valid", "format": "JSON"}'
)
if __name__ == "__main__":
unittest.main()
import unittest
from formats.geojson_format import is_valid
class TestShpFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/bermuda.geojson", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__invalid(self):
resource = {}
with open("tests/data/correct.json", "r") as file:
self.assertFalse(is_valid(resource, file))
if __name__ == "__main__":
unittest.main()
import unittest
from formats.gml_format import is_valid
class TestGmlFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/bermuda.gml", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__invalid(self):
resource = {}
with open("tests/data/correct.xml", "r") as file:
self.assertFalse(is_valid(resource, file))
if __name__ == "__main__":
unittest.main()
import unittest
from formats.json_format import is_valid
class TestJsonFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/correct.json", "r") as file:
self.assertTrue(is_valid(resource, file))
self.assertIsNone(resource.get("error"))
def test_is_valid__invalid(self):
resource = {}
with open("tests/data/incorrect.json", "r") as file:
self.assertFalse(is_valid(resource, file))
self.assertIsNotNone(resource.get("error"))
if __name__ == "__main__":
unittest.main()
import unittest
from formats.parquet_format import is_valid
class TestParquetFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/valid.parquet", "r") as file:
self.assertTrue(is_valid(resource, file))
self.assertIsNone(resource.get("error"))
def test_is_valid__broken(self):
resource = {}
with open("tests/data/broken.parquet", "r") as file:
self.assertFalse(is_valid(resource, file))
self.assertIsNotNone(resource.get("error"))
if __name__ == "__main__":
unittest.main()
import unittest
from formats.png_format import is_valid
class TestPngFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/image.png", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__invalid(self):
resource = {}
with open("tests/data/correct.json", "r") as file:
self.assertFalse(is_valid(resource, file))
if __name__ == "__main__":
unittest.main()
import unittest
from formats.shp_format import is_valid
class TestShpFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/bermuda.zip", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__multi_layer(self):
resource = {}
with open("tests/data/zos116.zip", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__sub_directory(self):
"""The ZIP file contains a subdirectory for the only layer."""
resource = {}
with open("tests/data/bermuda-with-subdir.zip", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__invalid(self):
resource = {}
with open("tests/data/json-in-zip.zip", "r") as file:
self.assertFalse(is_valid(resource, file))
if __name__ == "__main__":
unittest.main()
#!/usr/bin/env python3
import json
from collections import Counter
logfile_path = "result.jsonl"
def write_diagram(id, title, counter, counter_publisher):
print("<div>")
print(f"<h2>{title}</h2>")
print(f"<div id='vis{id}' style='max-width: 400px;'></div>")
print('<script type="text/javascript">')
print(f"new ApexCharts(document.querySelector('#vis{id}'),")
print("{ chart: { type: 'donut' },")
print(f"series: [{counter[True]}, {counter[False]}, {counter[None]}],")
print("labels: ['korrekt', 'fehlerhaft', 'nicht geprüft'],")
print('colors: ["#1eae9c", "#d4004b", "#a4adb6"]')
print("}).render();")
print("</script>")
print("<h3>Publishers affected</h3>")
print("<table>")
for p in counter_publisher:
print(f"<tr><td>{p}</td><td>{counter_publisher[p]}</td></tr>")
print("</table>")
print("<hr/>")
print("</div>")
counter_valid = Counter()
counter_404 = Counter()
counter_accessible = Counter()
counter_mimetype_mismatch = Counter()
counter_checksum_ok = Counter()
counter_schema_valid = Counter()
counter_publisher_accessible = Counter()
counter_publisher_checksum = Counter()
counter_publisher_valid = Counter()
counter_publisher_schema_valid = Counter()
counter_publisher_mimetype_mismatch = Counter()
distributions_with_problems = {}
with open(logfile_path, "r") as file:
for line in file:
entry = json.loads(line.strip())
publisher = entry.get("publisher")
id = entry.get("id")
valid = entry.get("valid", None)
counter_valid[valid] += 1
if valid is False:
counter_publisher_valid[publisher] += 1
distributions_with_problems[id] = entry
accessible = entry.get("accessible", None)
counter_accessible[accessible] += 1
if accessible is False:
counter_publisher_accessible[publisher] += 1
distributions_with_problems[id] = entry
checksum_ok = entry.get("checksum_ok", None)
counter_checksum_ok[checksum_ok] += 1
if checksum_ok is False:
counter_publisher_checksum[publisher] += 1
distributions_with_problems[id] = entry
schema_valid = entry.get("schema_valid", None)
counter_schema_valid[schema_valid] += 1
if schema_valid is False:
counter_publisher_schema_valid[publisher] += 1
distributions_with_problems[id] = entry
mimetype_correct = not entry.get("mimetype_mismatch", False)
counter_mimetype_mismatch[mimetype_correct] += 1
if mimetype_correct is False:
counter_publisher_mimetype_mismatch[publisher] += 1
distributions_with_problems[id] = entry
print("<!doctype html>")
print("<html>")
print(" <head>")
print(" <title>DCAT Catalog Check</title>")
print(' <script src="https://cdn.jsdelivr.net/npm/apexcharts"></script>')
print(' <link rel="stylesheet" href="https://cdn.datatables.net/2.1.8/css/dataTables.dataTables.css" />')
print(' <script src="https://code.jquery.com/jquery-3.7.1.min.js"></script>')
print(' <script src="https://cdn.datatables.net/2.1.8/js/dataTables.js"></script>')
print(" </head>")
print(" <body style='background: #f2f4f7;'>")
print(" <h1>Results of the DCAT Catalog Check</h1>")
write_diagram("1", "Availability", counter_accessible, counter_publisher_accessible)
write_diagram("2", "File content", counter_valid, counter_publisher_valid)
write_diagram(
"3",
"MIME type",
counter_mimetype_mismatch,
counter_publisher_mimetype_mismatch,
)
write_diagram("4", "Checksum", counter_checksum_ok, counter_publisher_checksum)
write_diagram(
"5", "Frictionless Schema", counter_schema_valid, counter_publisher_schema_valid
)
print("<div>")
print("<h2>Distributionen with errors</h2>")
print('<table class="table" id="distributions">')
print("<thead><tr><th>Publisher</th><th>Format</th><th>available</th><th>content correct</th><th>MIME type wrong</th><th>MIME type</th><th>checksum correct</th><th>schema valid</th><th>URL</th></tr></thead>")
print("<tbody>")
for dist in distributions_with_problems:
entry = distributions_with_problems[dist]
print(f"<tr><td>{entry.get('publisher')}</td><td>{entry.get('format')}</td><td>{entry.get('http_status','')}</td><td>{entry.get('valid','')}</td><td>{entry.get('mimetype_mismatch','')}</td><td>{entry.get('mimetype','')}</td><td>{entry.get('checksum_ok','')}</td><td>{entry.get('schema_valid','')}</td><td>{entry.get('url')}</td></tr>")
print("</tbody></table>")
print("</div>")
print("<script>let table = new DataTable('#distributions');</script>")
print("</body></html>")
#!/usr/bin/env python3
import json
import sqlite3
from collections import defaultdict
logfile_path = "result.jsonl"
database_path = "result.db"
# Converts a file in JSON Lines text file format into an SQLite database.
# The resulting table will have a column for each property that appears
# in the logfile. The file tries to preserve the data types of the
# properties. Missing properties will become NULL values in the table.
# 1st pass: collect properties and create table schema
properties = defaultdict(set) # Set stores the types of each property
# Open file and determine types of properties
with open(logfile_path, "r") as file:
for line in file:
entry = json.loads(line.strip())
for key, value in entry.items():
if isinstance(value, bool):
properties[key].add("BOOLEAN")
elif isinstance(value, int):
properties[key].add("INTEGER")
elif isinstance(value, float):
properties[key].add("REAL")
else:
properties[key].add("TEXT")
# define SQL types
columns = {}
for key, types in properties.items():
if "TEXT" in types:
columns[key] = "TEXT"
elif "REAL" in types:
columns[key] = "REAL"
elif "INTEGER" in types:
columns[key] = "INTEGER"
elif "BOOLEAN" in types:
columns[key] = "BOOLEAN"
else:
columns[key] = "TEXT"
create_table_query = f"CREATE TABLE IF NOT EXISTS log ({', '.join([f'{col} {dtype}' for col, dtype in columns.items()])});"
# Create database connection and table
conn = sqlite3.connect(database_path)
cur = conn.cursor()
cur.execute(create_table_query)
conn.commit()
# 2nd pass: insert data into database
with open(logfile_path, "r") as file:
for line in file:
entry = json.loads(line.strip())
# prepare SQL statement to insert the data
columns_str = ", ".join(columns.keys())
placeholders = ", ".join([f":{col}" for col in columns.keys()])
insert_query = f"INSERT INTO log ({columns_str}) VALUES ({placeholders})"
# Prepare data with NULL for missing values
data = {col: entry.get(col) for col in columns.keys()}
# insert data into database
cur.execute(insert_query, data)
# save changes and close the file
conn.commit()
conn.close()
print("Data successfully written to the SQLite database.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment