Skip to content
Snippets Groups Projects
Verified Commit 9abe4fad authored by Jesper Zedlitz's avatar Jesper Zedlitz
Browse files

tests for GeoTIFF

parent 7b23ae7b
No related branches found
No related tags found
1 merge request!1Update Formats, Dependencies, and Dockerfile Configuration
Pipeline #1401 failed
......@@ -175,8 +175,7 @@ class DcatCatalogCheck:
format = resource["format"].lower()
try:
# dynamically import the corresponding module for the format
format_check_module = importlib.import_module(
f"formats.{format}_format")
format_check_module = importlib.import_module(f"formats.{format}_format")
except ModuleNotFoundError:
format_check_module = None
......@@ -214,8 +213,7 @@ class DcatCatalogCheck:
# write the content of the HTTP response into a temporary file
original_file_name = url.split("/")[-1]
suffix = original_file_name.split(
".")[-1] if "." in original_file_name else ""
suffix = original_file_name.split(".")[-1] if "." in original_file_name else ""
with tempfile.NamedTemporaryFile(
delete=False, suffix="." + suffix
) as temp_file:
......@@ -238,8 +236,7 @@ class DcatCatalogCheck:
decompressor = decompressors.get(resource["mimetype"])
if not decompressor:
self.logger.warning(
f"Unknown compression {resource['mimetype']}.")
self.logger.warning(f"Unknown compression {resource['mimetype']}.")
else:
with tempfile.NamedTemporaryFile(delete=False) as decompressed_file:
with decompressor.open(temp_file.name, "rb") as compressed_file:
......@@ -249,9 +246,10 @@ class DcatCatalogCheck:
temp_file = decompressed_file
resource["mimetype"] = self._guess_mime_type(temp_file.name)
if self._is_container(resource["mimetype"], resource["format"]):
self._check_container_file(
resource, temp_file, format_check_module)
if self._is_container(resource["mimetype"], resource["format"]) and resource[
"format"
] not in ["GTFS", "GEOTIFF", "SHP"]:
self._check_container_file(resource, temp_file, format_check_module)
else:
self._check_single_file(resource, temp_file, format_check_module)
......@@ -279,8 +277,7 @@ class DcatCatalogCheck:
temp_file.write(file.read())
temp_file.flush()
resource["mimetype"] = self._guess_mime_type(
temp_file.name)
resource["mimetype"] = self._guess_mime_type(temp_file.name)
validation_result = (
validation_result
and self._check_single_file(
......@@ -294,14 +291,12 @@ class DcatCatalogCheck:
return contains_at_least_one_relevant_file and validation_result
else:
self.logger.error(
f"Unsupported container format {resource['mimetype']}")
self.logger.error(f"Unsupported container format {resource['mimetype']}")
def _check_single_file(self, resource, temp_file, format_check_module):
if format_check_module:
# call the function `process` that is defined in every modul
resource["valid"] = format_check_module.is_valid(
resource, temp_file)
resource["valid"] = format_check_module.is_valid(resource, temp_file)
else:
# There is no specialized check for the specified format.
# Does the returned MIME type match the promised format?
......@@ -326,8 +321,7 @@ class DcatCatalogCheck:
):
hash_algorithm = hashlib.md5()
else:
print(
f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
print(f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
return
with open(temp_file.name, "rb") as f:
......@@ -422,8 +416,7 @@ class DcatCatalogCheck:
publisher = graph.value(dataset, DCTERMS.publisher)
if not publisher:
self.logger.warning(
f"Publisher not found for dataset: {dataset}")
self.logger.warning(f"Publisher not found for dataset: {dataset}")
return None
# Attempt to get the publisher's name
......@@ -437,8 +430,7 @@ class DcatCatalogCheck:
except Exception as e:
# Log any unexpected errors
self.logger.error(
f"Error retrieving publisher for dataset {dataset}: {e}")
self.logger.error(f"Error retrieving publisher for dataset {dataset}: {e}")
return None
def _process_datasets(self, datasets, g):
......@@ -463,8 +455,7 @@ class DcatCatalogCheck:
url = str(resource["url"])
if self._needs_check(url):
checksum_resource = g.value(
distribution, SPDX.checksum)
checksum_resource = g.value(distribution, SPDX.checksum)
if checksum_resource:
resource["checksum_algorithm"] = str(
g.value(checksum_resource, SPDX.algorithm)
......@@ -514,8 +505,7 @@ class DcatCatalogCheck:
loaded_count += 1
except json.JSONDecodeError as e:
self.logger.error(
f"Invalid JSON at line {line_number}: {e}")
self.logger.error(f"Invalid JSON at line {line_number}: {e}")
skipped_count += 1
self.logger.info(
......@@ -543,8 +533,7 @@ class DcatCatalogCheck:
self._process_datasets(datasets, g)
paged_collection = g.value(
predicate=RDF.type, object=HYDRA.PagedCollection)
paged_collection = g.value(predicate=RDF.type, object=HYDRA.PagedCollection)
next_page = g.value(paged_collection, HYDRA.nextPage)
url = str(next_page) if next_page else None
......@@ -569,12 +558,9 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--url", help="DCAT catalog URL")
parser.add_argument("--log_file", help="Log file path")
parser.add_argument(
"--results", help="File from which the results are loaded")
parser.add_argument("--verbose", action="store_true",
help="Enable verbose logging")
parser.add_argument("--debug", action="store_true",
help="Enable debug logging")
parser.add_argument("--results", help="File from which the results are loaded")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument(
"--recheck",
action="store_true",
......@@ -585,8 +571,7 @@ if __name__ == "__main__":
action="store_true",
help="Just check new entries from the catalog. Do not re-check existing results.",
)
parser.add_argument(
"--check-format", help="Only check the specified format")
parser.add_argument("--check-format", help="Only check the specified format")
parser.add_argument(
"--force-check-format",
help="Check distributinons with the specified format regardless of previous results",
......
from osgeo import gdal
import zipfile
import tempfile
import os
def is_geotiff(resource, file_name):
dataset = gdal.Open(file_name)
geotransform = dataset.GetGeoTransform()
default_transform = (0.0, 1.0, 0.0, 0.0, 0.0, 1.0)
if geotransform == default_transform:
resource["error"] = "missing transformation"
return False
return True
def is_valid(resource, file):
"""Check if the content is a GeoTIFF file."""
# Some GeoTIFF files consist for two files in a ZIP file:
# - the TIFF image itself
# - a TFW world file with the transform information
if zipfile.is_zipfile(file.name):
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(file.name, "r") as zip_ref:
file_list = zip_ref.namelist()
relevant_files = [
file
for file in file_list
if file.lower().endswith(".tiff") or file.lower().endswith(".tif")
]
contains_at_least_one_relevant_file = len(relevant_files) > 0
if contains_at_least_one_relevant_file:
zip_ref.extractall(temp_dir)
for tif_name in relevant_files:
tif_path = os.path.join(temp_dir, tif_name)
if is_geotiff(resource, tif_path):
# the ZIP file contains at least one valid GeoTIFF
return True
else:
resource["error"] = "ZIP file contains not TIFF image"
return False
else:
return is_geotiff(resource, file.name)
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
[[package]]
name = "annotated-types"
......@@ -402,6 +402,19 @@ visidata = ["visidata (>=2.10)"]
wkt = ["tatsu (>=5.8.3)"]
zenodo = ["pyzenodo3 (>=1.0)"]
[[package]]
name = "gdal"
version = "3.6.2"
description = "GDAL: Geospatial Data Abstraction Library"
optional = false
python-versions = ">=3.6.0"
files = [
{file = "GDAL-3.6.2.tar.gz", hash = "sha256:a167cde1813707d91a938dad1a22f280f5ad28c45980d42e948fb8c59f890f5a"},
]
[package.extras]
numpy = ["numpy (>1.0.0)"]
[[package]]
name = "geojson"
version = "3.2.0"
......@@ -1795,4 +1808,4 @@ crypto-eth-addresses = ["eth-hash[pycryptodome] (>=0.7.0)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "4ba78fa3eb7d714a01d408c49df85bc74c63fb4bdb627dba2a21c0dfa8e126a6"
content-hash = "28cf723fd433a2a9f0c2d0c5dfd8bbfb6876d8de8b9e7faf93d0e0aeda5c458b"
......@@ -22,6 +22,7 @@ pillow = "^11.0.0"
fiona = "^1.10.1"
pyarrow = "^18.1.0"
geojson = "^3.2.0"
gdal = "3.6.2"
[tool.poetry.group.dev.dependencies]
coverage = "^7.6.1"
......
import unittest
from formats.geotiff_format import is_valid
class TestGeotiffFormat(unittest.TestCase):
def test_is_valid__valid(self):
resource = {}
with open("tests/data/valid_geotiff.tif", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__zip(self):
"""The ZIP file contains the TIFF image and a TFW world file."""
resource = {}
with open("tests/data/geotiff.zip", "r") as file:
self.assertTrue(is_valid(resource, file))
def test_is_valid__invalid(self):
resource = {}
with open("tests/data/valid.tif", "r") as file:
self.assertFalse(is_valid(resource, file))
self.assertIsNotNone(resource.get("error"))
self.assertEqual("missing transformation", resource["error"])
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment