Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • opendata/dcat-catalog-check
1 result
Select Git revision
Show changes
Commits on Source (28)
Showing with 324 additions and 96 deletions
[run]
omit =
tests/*
\ No newline at end of file
...@@ -20,6 +20,9 @@ ruff: ...@@ -20,6 +20,9 @@ ruff:
image: python:3.10 image: python:3.10
stage: lint stage: lint
before_script: before_script:
# Install libgdal-dev
- apt-get update
- apt-get install -y libgdal-dev
# Install pipx # Install pipx
- python3 -m pip install --user pipx - python3 -m pip install --user pipx
- python3 -m pipx ensurepath - python3 -m pipx ensurepath
...@@ -36,6 +39,9 @@ test: ...@@ -36,6 +39,9 @@ test:
image: python:3.10 image: python:3.10
stage: test stage: test
before_script: before_script:
# Install libgdal-dev
- apt-get update
- apt-get install -y libgdal-dev
# Install pipx # Install pipx
- python3 -m pip install --user pipx - python3 -m pip install --user pipx
- python3 -m pipx ensurepath - python3 -m pipx ensurepath
......
...@@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file. ...@@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.1.0] - 2025-01-09
### Added
- **Unit Tests**:
- URI replacements and resource clearing functionality.
- Support for multiple formats: Atom, DOCX, GeoTIFF, GeoJSON, JPEG, ODT, ODS, PDF, RDF, TXT, WMTS, XLSX.
- Frictionless Data Resource validation.
- **Report Generation**:
- Added columns for **HTTP status** and **error message** in the generated reports.
- Implemented **filters** for table columns, allowing users to refine data views.
### Changed
- **Coverage Configuration**:
- Updated coverage settings to better manage test file inclusion/exclusion.
- Test files are now excluded from coverage reports to focus on measuring application code quality.
- **Dockerfile**: Switched base image to `python:3.10` and updated installation steps for dependencies, pipx, and Poetry.
## [1.0.0] - 2024-12-20 ## [1.0.0] - 2024-12-20
### Added ### Added
......
FROM alpine FROM python:3.10
# Install necessary system dependencies # Install necessary system dependencies
RUN apk add --no-cache poetry proj-util gdal-dev gcc python3-dev musl-dev geos-dev proj-dev libmagic RUN apt-get update && \
apt-get install -y \
libgdal-dev \
libmagic-dev \
gcc \
python3-dev \
musl-dev \
libgeos-dev \
libproj-dev \
&& python3 -m pip install --upgrade pip \
&& python3 -m pip install pipx \
&& python3 -m pipx ensurepath
# Set the PATH for pipx # Ensure pipx is in the PATH
ENV PATH="/root/.local/bin:${PATH}" ENV PATH="/root/.local/bin:${PATH}"
# Install poetry using pipx
RUN pipx install poetry
# Set the working directory inside the container # Set the working directory inside the container
WORKDIR /app WORKDIR /app
......
...@@ -30,16 +30,26 @@ The following format checks are currently being carried out: ...@@ -30,16 +30,26 @@ The following format checks are currently being carried out:
| Format | Check | | Format | Check |
| --------- | ------- | | --------- | ------- |
| `GEOJSON` | Load the file using [`GeoPandas`](https://geopandas.org). | | `ATOM` | Validates whether the file content is a valid ATOM feed by confirming the root element is `<feed>` in the Atom XML namespace. |
| `GML` | Load the file using [`GeoPandas`](https://geopandas.org). | | `DOCX` | Verifies that the file is a valid DOCX by ensuring the ZIP archive contains the necessary XML files (`document.xml` and `styles.xml`). |
| `JPEG` | Load the image. | | `GEOJSON` | Loads and validates the file using [`GeoPandas`](https://geopandas.org). |
| `JSON` | Is it syntactically correct JSON? If it is a *Frictionless Data Resource*, it is checked with the Frictionless Tools. | | `GEOTIFF` | Verifies the file is a valid GeoTIFF by checking its GeoTransform information and supports both standalone and ZIP-compressed GeoTIFF formats. |
| `PNG` | Load the image. | | `GML` | Loads and validates the file using [`GeoPandas`](https://geopandas.org). |
| `PDF` | Load the document using [`pypdf`](https://pypi.org/project/pypdf/). | | `JPEG` | Loads and validates the image file. |
| `SHP` | Load the file using [`GeoPandas`](https://geopandas.org). | | `JSON` | Verifies that the file is syntactically correct JSON and, if it is a *Frictionless Data Resource*, checks it using Frictionless Tools. |
| `WFS` | Is it a valid well-formed `WMS_Capabilities` XML document? If the address does not contain the `request=GetCapabilities` parameter, a `GetCapabilities` request is performed. This response is then checked. | | `ODS` | Validates that the file is a valid ODS (OpenDocument Spreadsheet) by checking the ZIP structure, required files, and correct MIME type. |
| `WMS` | Is it a valid well-formed `WFS_Capabilities` XML document? If the address does not contain the `request=GetCapabilities` parameter, a `GetCapabilities` request is performed. This response is then checked. | | `ODT` | Validates that the file is a valid ODT (OpenDocument Text) by confirming the ZIP structure, required files, and correct MIME type. |
| `XML` | Is it well-formed XML? | | `PARQUET` | Verifies that the file is a readable Apache Parquet file by loading it using [`pandas`](https://pandas.pydata.org/). |
| `PDF` | Loads and validates the PDF document using [`pypdf`](https://pypi.org/project/pypdf/). |
| `PNG` | Loads and validates the image file. |
| `RDF` | Verifies the file is a valid RDF (Resource Description Framework) document and contains at least two statements. |
| `SHP` | Loads and validates the file using [`GeoPandas`](https://geopandas.org). |
| `WFS` | Validates if the file is a well-formed `WMS_Capabilities` XML document. If not, a `GetCapabilities` request is made and validated. |
| `WMS` | Validates if the file is a well-formed `WFS_Capabilities` XML document. If not, a `GetCapabilities` request is made and validated. |
| `WMTS` | Validates if the file contains a valid WMTS (Web Map Tile Service) capabilities XML response, either directly or by performing a `GetCapabilities` request. |
| `XLSX` | Verifies that the file is a ZIP archive and contains the required files (`xl/workbook.xml` and `xl/styles.xml`) typical of a valid XLSX file. |
| `XML` | Verifies if the file is well-formed XML. |
| `ZIP` | Verifies if the file is a valid ZIP archive using Python's `zipfile.is_zipfile()` method. |
## Installation ## Installation
...@@ -208,8 +218,6 @@ In this example: ...@@ -208,8 +218,6 @@ In this example:
2. Define the desired replacements in the JSON array format described above. 2. Define the desired replacements in the JSON array format described above.
3. Run the script as usual. If the file exists, replacements will be applied automatically. 3. Run the script as usual. If the file exists, replacements will be applied automatically.
By using `uri_replacements.json`, you can streamline URL handling and ensure consistent preprocessing for your link-checking tasks.
## Docker ## Docker
You can run the script in a Docker container. See the [Dockerfile](./Dockerfile) for more information. You can run the script in a Docker container. See the [Dockerfile](./Dockerfile) for more information.
...@@ -225,7 +233,7 @@ You can run the script in a Docker container. See the [Dockerfile](./Dockerfile) ...@@ -225,7 +233,7 @@ You can run the script in a Docker container. See the [Dockerfile](./Dockerfile)
2. Run the Docker container: 2. Run the Docker container:
```sh ```sh
docker run --rm dcat-catalog-check --url https://example.com docker run --rm dcat-catalog-check --url https://example.com
``` ```
## Tests ## Tests
......
...@@ -154,7 +154,8 @@ class DcatCatalogCheck: ...@@ -154,7 +154,8 @@ class DcatCatalogCheck:
"error", "error",
"etag", "etag",
"http_status", "http_status",
"last_check" "mimetype", "last_check",
"mimetype",
"mimetype_mismatch", "mimetype_mismatch",
"valid", "valid",
]: ]:
...@@ -174,8 +175,7 @@ class DcatCatalogCheck: ...@@ -174,8 +175,7 @@ class DcatCatalogCheck:
format = resource["format"].lower() format = resource["format"].lower()
try: try:
# dynamically import the corresponding module for the format # dynamically import the corresponding module for the format
format_check_module = importlib.import_module( format_check_module = importlib.import_module(f"formats.{format}_format")
f"formats.{format}_format")
except ModuleNotFoundError: except ModuleNotFoundError:
format_check_module = None format_check_module = None
...@@ -194,6 +194,9 @@ class DcatCatalogCheck: ...@@ -194,6 +194,9 @@ class DcatCatalogCheck:
if "etag" in response.headers: if "etag" in response.headers:
resource["etag"] = response.headers["etag"] resource["etag"] = response.headers["etag"]
if "content-length" in response.headers:
resource["size"] = response.headers["content-length"]
except requests.exceptions.RequestException as err: except requests.exceptions.RequestException as err:
# Handle connection, timeout, or other request errors # Handle connection, timeout, or other request errors
resource["accessible"] = False resource["accessible"] = False
...@@ -210,8 +213,7 @@ class DcatCatalogCheck: ...@@ -210,8 +213,7 @@ class DcatCatalogCheck:
# write the content of the HTTP response into a temporary file # write the content of the HTTP response into a temporary file
original_file_name = url.split("/")[-1] original_file_name = url.split("/")[-1]
suffix = original_file_name.split( suffix = original_file_name.split(".")[-1] if "." in original_file_name else ""
".")[-1] if "." in original_file_name else ""
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
delete=False, suffix="." + suffix delete=False, suffix="." + suffix
) as temp_file: ) as temp_file:
...@@ -234,8 +236,7 @@ class DcatCatalogCheck: ...@@ -234,8 +236,7 @@ class DcatCatalogCheck:
decompressor = decompressors.get(resource["mimetype"]) decompressor = decompressors.get(resource["mimetype"])
if not decompressor: if not decompressor:
self.logger.warning( self.logger.warning(f"Unknown compression {resource['mimetype']}.")
f"Unknown compression {resource['mimetype']}.")
else: else:
with tempfile.NamedTemporaryFile(delete=False) as decompressed_file: with tempfile.NamedTemporaryFile(delete=False) as decompressed_file:
with decompressor.open(temp_file.name, "rb") as compressed_file: with decompressor.open(temp_file.name, "rb") as compressed_file:
...@@ -245,9 +246,10 @@ class DcatCatalogCheck: ...@@ -245,9 +246,10 @@ class DcatCatalogCheck:
temp_file = decompressed_file temp_file = decompressed_file
resource["mimetype"] = self._guess_mime_type(temp_file.name) resource["mimetype"] = self._guess_mime_type(temp_file.name)
if self._is_container(resource["mimetype"], resource["format"]): if self._is_container(resource["mimetype"], resource["format"]) and resource[
self._check_container_file( "format"
resource, temp_file, format_check_module) ] not in ["GTFS", "GEOTIFF", "SHP"]:
self._check_container_file(resource, temp_file, format_check_module)
else: else:
self._check_single_file(resource, temp_file, format_check_module) self._check_single_file(resource, temp_file, format_check_module)
...@@ -275,8 +277,7 @@ class DcatCatalogCheck: ...@@ -275,8 +277,7 @@ class DcatCatalogCheck:
temp_file.write(file.read()) temp_file.write(file.read())
temp_file.flush() temp_file.flush()
resource["mimetype"] = self._guess_mime_type( resource["mimetype"] = self._guess_mime_type(temp_file.name)
temp_file.name)
validation_result = ( validation_result = (
validation_result validation_result
and self._check_single_file( and self._check_single_file(
...@@ -290,14 +291,12 @@ class DcatCatalogCheck: ...@@ -290,14 +291,12 @@ class DcatCatalogCheck:
return contains_at_least_one_relevant_file and validation_result return contains_at_least_one_relevant_file and validation_result
else: else:
self.logger.error( self.logger.error(f"Unsupported container format {resource['mimetype']}")
f"Unsupported container format {resource['mimetype']}")
def _check_single_file(self, resource, temp_file, format_check_module): def _check_single_file(self, resource, temp_file, format_check_module):
if format_check_module: if format_check_module:
# call the function `process` that is defined in every modul # call the function `process` that is defined in every modul
resource["valid"] = format_check_module.is_valid( resource["valid"] = format_check_module.is_valid(resource, temp_file)
resource, temp_file)
else: else:
# There is no specialized check for the specified format. # There is no specialized check for the specified format.
# Does the returned MIME type match the promised format? # Does the returned MIME type match the promised format?
...@@ -322,8 +321,7 @@ class DcatCatalogCheck: ...@@ -322,8 +321,7 @@ class DcatCatalogCheck:
): ):
hash_algorithm = hashlib.md5() hash_algorithm = hashlib.md5()
else: else:
print( print(f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
f"WARNING: unknown checksum algorithm {algo_name}", file=sys.stderr)
return return
with open(temp_file.name, "rb") as f: with open(temp_file.name, "rb") as f:
...@@ -418,8 +416,7 @@ class DcatCatalogCheck: ...@@ -418,8 +416,7 @@ class DcatCatalogCheck:
publisher = graph.value(dataset, DCTERMS.publisher) publisher = graph.value(dataset, DCTERMS.publisher)
if not publisher: if not publisher:
self.logger.warning( self.logger.warning(f"Publisher not found for dataset: {dataset}")
f"Publisher not found for dataset: {dataset}")
return None return None
# Attempt to get the publisher's name # Attempt to get the publisher's name
...@@ -433,8 +430,7 @@ class DcatCatalogCheck: ...@@ -433,8 +430,7 @@ class DcatCatalogCheck:
except Exception as e: except Exception as e:
# Log any unexpected errors # Log any unexpected errors
self.logger.error( self.logger.error(f"Error retrieving publisher for dataset {dataset}: {e}")
f"Error retrieving publisher for dataset {dataset}: {e}")
return None return None
def _process_datasets(self, datasets, g): def _process_datasets(self, datasets, g):
...@@ -459,8 +455,7 @@ class DcatCatalogCheck: ...@@ -459,8 +455,7 @@ class DcatCatalogCheck:
url = str(resource["url"]) url = str(resource["url"])
if self._needs_check(url): if self._needs_check(url):
checksum_resource = g.value( checksum_resource = g.value(distribution, SPDX.checksum)
distribution, SPDX.checksum)
if checksum_resource: if checksum_resource:
resource["checksum_algorithm"] = str( resource["checksum_algorithm"] = str(
g.value(checksum_resource, SPDX.algorithm) g.value(checksum_resource, SPDX.algorithm)
...@@ -481,7 +476,8 @@ class DcatCatalogCheck: ...@@ -481,7 +476,8 @@ class DcatCatalogCheck:
def read_previous_results(self, file_path): def read_previous_results(self, file_path):
if not os.path.exists(file_path): if not os.path.exists(file_path):
self.logger.warning( self.logger.warning(
f"File '{file_path}' does not exist. No previous results loaded.") f"File '{file_path}' does not exist. No previous results loaded."
)
return return
loaded_count = 0 loaded_count = 0
...@@ -500,7 +496,8 @@ class DcatCatalogCheck: ...@@ -500,7 +496,8 @@ class DcatCatalogCheck:
url = json_object.get("url") url = json_object.get("url")
if not url: if not url:
self.logger.warning( self.logger.warning(
f"Line {line_number} is missing 'url': {line}") f"Line {line_number} is missing 'url': {line}"
)
skipped_count += 1 skipped_count += 1
continue continue
...@@ -508,12 +505,12 @@ class DcatCatalogCheck: ...@@ -508,12 +505,12 @@ class DcatCatalogCheck:
loaded_count += 1 loaded_count += 1
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
self.logger.error( self.logger.error(f"Invalid JSON at line {line_number}: {e}")
f"Invalid JSON at line {line_number}: {e}")
skipped_count += 1 skipped_count += 1
self.logger.info( self.logger.info(
f"Loaded {loaded_count} results from '{file_path}', skipped {skipped_count} lines.") f"Loaded {loaded_count} results from '{file_path}', skipped {skipped_count} lines."
)
def read_dcat_catalog(self, url): def read_dcat_catalog(self, url):
while url: while url:
...@@ -536,8 +533,7 @@ class DcatCatalogCheck: ...@@ -536,8 +533,7 @@ class DcatCatalogCheck:
self._process_datasets(datasets, g) self._process_datasets(datasets, g)
paged_collection = g.value( paged_collection = g.value(predicate=RDF.type, object=HYDRA.PagedCollection)
predicate=RDF.type, object=HYDRA.PagedCollection)
next_page = g.value(paged_collection, HYDRA.nextPage) next_page = g.value(paged_collection, HYDRA.nextPage)
url = str(next_page) if next_page else None url = str(next_page) if next_page else None
...@@ -562,12 +558,9 @@ if __name__ == "__main__": ...@@ -562,12 +558,9 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--url", help="DCAT catalog URL") parser.add_argument("--url", help="DCAT catalog URL")
parser.add_argument("--log_file", help="Log file path") parser.add_argument("--log_file", help="Log file path")
parser.add_argument( parser.add_argument("--results", help="File from which the results are loaded")
"--results", help="File from which the results are loaded") parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--verbose", action="store_true", parser.add_argument("--debug", action="store_true", help="Enable debug logging")
help="Enable verbose logging")
parser.add_argument("--debug", action="store_true",
help="Enable debug logging")
parser.add_argument( parser.add_argument(
"--recheck", "--recheck",
action="store_true", action="store_true",
...@@ -578,8 +571,7 @@ if __name__ == "__main__": ...@@ -578,8 +571,7 @@ if __name__ == "__main__":
action="store_true", action="store_true",
help="Just check new entries from the catalog. Do not re-check existing results.", help="Just check new entries from the catalog. Do not re-check existing results.",
) )
parser.add_argument( parser.add_argument("--check-format", help="Only check the specified format")
"--check-format", help="Only check the specified format")
parser.add_argument( parser.add_argument(
"--force-check-format", "--force-check-format",
help="Check distributinons with the specified format regardless of previous results", help="Check distributinons with the specified format regardless of previous results",
......
version: '3.8'
services:
lint:
image: node
command: >
sh -c "
npm install -g markdownlint markdownlint-cli &&
markdownlint '**/*.md' --ignore node_modules | tee lint.log
"
volumes:
- .:/app
- /app/node_modules
ruff:
image: python:3.10
command: >
bash -c "
apt-get update &&
apt-get install -y libgdal-dev &&
python3 -m pip install --user pipx &&
python3 -m pipx ensurepath &&
source ~/.bashrc &&
pipx install poetry &&
poetry install &&
poetry run ruff check .
"
volumes:
- .:/app
working_dir: /app
import xml.etree.ElementTree as ET
def is_valid(resource, file):
"""Check if the HTTP response is an ATOM feed."""
with open(file.name, "rb") as f:
try:
xml = ET.parse(f).getroot()
if xml.tag == "{http://www.w3.org/2005/Atom}feed":
return True
else:
resource["error"] = (
"Root element is not {http://www.w3.org/2005/Atom}feed"
)
return False
except Exception as e:
resource["error"] = str(e)
return False
import zipfile
def is_valid(resource, file):
"""Check if the content is a DOCX file."""
if not zipfile.is_zipfile(file.name):
resource["error"] = "Not a ZIP file."
return False
with zipfile.ZipFile(file.name, "r") as zip_ref:
zip_contents = zip_ref.namelist()
required_files = ["word/document.xml", "word/styles.xml"]
if not all(file in zip_contents for file in required_files):
resource["error"] = "That does not look like an DOCX file."
return False
return True
import geopandas import geojson
from pyogrio.errors import DataSourceError
from shapely.errors import GEOSException
def is_valid(resource, file): def is_valid(resource, file):
...@@ -8,9 +6,11 @@ def is_valid(resource, file): ...@@ -8,9 +6,11 @@ def is_valid(resource, file):
with open(file.name, "rb") as f: with open(file.name, "rb") as f:
try: try:
geopandas.read_file(f) geojson_data = geojson.load(f)
return True if isinstance(geojson_data, dict) and "type" in geojson_data:
except DataSourceError: return True
return False else:
except GEOSException: resource["error"] = "JSON is not GeoJSON."
return False return False
except Exception as e:
resource["error"] = str(e)
from osgeo import gdal
import zipfile
import tempfile
import os
def is_geotiff(resource, file_name):
dataset = gdal.Open(file_name)
if not dataset:
resource["error"] = f"could not read file {file_name}"
return False
geotransform = dataset.GetGeoTransform()
default_transform = (0.0, 1.0, 0.0, 0.0, 0.0, 1.0)
if geotransform == default_transform:
resource["error"] = "missing transformation"
return False
return True
def is_valid(resource, file):
"""Check if the content is a GeoTIFF file."""
# Some GeoTIFF files consist for two files in a ZIP file:
# - the TIFF image itself
# - a TFW world file with the transform information
if zipfile.is_zipfile(file.name):
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(file.name, "r") as zip_ref:
file_list = zip_ref.namelist()
relevant_files = [
file
for file in file_list
if file.lower().endswith(".tiff") or file.lower().endswith(".tif")
]
contains_at_least_one_relevant_file = len(relevant_files) > 0
if contains_at_least_one_relevant_file:
zip_ref.extractall(temp_dir)
for tif_name in relevant_files:
tif_path = os.path.join(temp_dir, tif_name)
if is_geotiff(resource, tif_path):
# the ZIP file contains at least one valid GeoTIFF
return True
else:
resource["error"] = "ZIP file contains not TIFF image"
return False
else:
return is_geotiff(resource, file.name)
import geopandas import geopandas
from pyogrio.errors import DataSourceError
from shapely.errors import GEOSException
def is_valid(resource, file): def is_valid(resource, file):
...@@ -10,12 +8,6 @@ def is_valid(resource, file): ...@@ -10,12 +8,6 @@ def is_valid(resource, file):
try: try:
geopandas.read_file(f) geopandas.read_file(f)
return True return True
except DataSourceError as e:
resource["error"] = str(e)
return False
except GEOSException as e:
resource["error"] = str(e)
return False
except Exception as e: except Exception as e:
resource["error"] = str(e) resource["error"] = str(e)
return False return False
...@@ -23,9 +23,6 @@ def is_valid(resource, file): ...@@ -23,9 +23,6 @@ def is_valid(resource, file):
return resource["schema_valid"] return resource["schema_valid"]
return True return True
except json.JSONDecodeError as e: except Exception as e:
resource["error"] = str(e)
return False
except UnicodeDecodeError as e:
resource["error"] = str(e) resource["error"] = str(e)
return False return False
import zipfile
def is_valid(resource, file):
"""Check if the content is a ODS file."""
if not zipfile.is_zipfile(file.name):
resource["error"] = "Not a ZIP file."
return False
with zipfile.ZipFile(file.name, "r") as zip_ref:
zip_contents = zip_ref.namelist()
required_files = ["mimetype", "content.xml", "meta.xml", "styles.xml"]
if not all(file in zip_contents for file in required_files):
resource["error"] = "That does not look like an ODS file."
return False
with zip_ref.open("mimetype") as mimetype_file:
mimetype_content = mimetype_file.read().decode("utf-8").strip()
if mimetype_content != "application/vnd.oasis.opendocument.spreadsheet":
resource["error"] = f"Incorrect MIME type: {mimetype_content}"
return False
return True
import zipfile
def is_valid(resource, file):
"""Check if the content is a ODT file."""
if not zipfile.is_zipfile(file.name):
resource["error"] = "Not a ZIP file."
return False
with zipfile.ZipFile(file.name, "r") as zip_ref:
zip_contents = zip_ref.namelist()
required_files = ["mimetype", "content.xml", "meta.xml", "styles.xml"]
if not all(file in zip_contents for file in required_files):
resource["error"] = "That does not look like an ODT file."
return False
with zip_ref.open("mimetype") as mimetype_file:
mimetype_content = mimetype_file.read().decode("utf-8").strip()
if mimetype_content != "application/vnd.oasis.opendocument.text":
resource["error"] = f"Incorrect MIME type: {mimetype_content}"
return False
return True
from pypdf import PdfReader from pypdf import PdfReader
from pypdf.errors import PyPdfError
def is_valid(resource, file): def is_valid(resource, file):
...@@ -9,5 +8,6 @@ def is_valid(resource, file): ...@@ -9,5 +8,6 @@ def is_valid(resource, file):
try: try:
PdfReader(f) PdfReader(f)
return True return True
except PyPdfError: except Exception as e:
resource["error"] = str(e)
return False return False
from PIL import Image, UnidentifiedImageError from PIL import Image
def is_valid(resource, file): def is_valid(resource, file):
...@@ -7,5 +7,6 @@ def is_valid(resource, file): ...@@ -7,5 +7,6 @@ def is_valid(resource, file):
try: try:
with Image.open(file.name, formats=["PNG"]): with Image.open(file.name, formats=["PNG"]):
return True return True
except UnidentifiedImageError: except Exception as e:
resource["error"] = str(e)
return False return False
from rdflib import Graph
def is_valid(resource, file):
"""Check if file is a valid RDF document."""
try:
graph = Graph()
graph.parse(file.name)
# even an empty RDF document contains two statements
if len(graph) > 2:
return True
else:
resource["error"] = "RDF document does not contain any statements."
return False
except Exception as e:
resource["error"] = str(e)
return False
import geopandas import geopandas
from pyogrio.errors import DataSourceError
from shapely.errors import GEOSException
import zipfile import zipfile
...@@ -24,10 +22,7 @@ def is_valid(resource, file): ...@@ -24,10 +22,7 @@ def is_valid(resource, file):
with open(file.name, "rb") as f: with open(file.name, "rb") as f:
try: try:
geopandas.read_file(f) geopandas.read_file(f)
except DataSourceError as e: except Exception as e:
resource["error"] = str(e)
return False
except GEOSException as e:
resource["error"] = str(e) resource["error"] = str(e)
return False return False
return True return True
...@@ -37,10 +32,7 @@ def is_valid(resource, file): ...@@ -37,10 +32,7 @@ def is_valid(resource, file):
with z.open(shp) as f: with z.open(shp) as f:
try: try:
geopandas.read_file(f"zip://{file.name}!{shp}") geopandas.read_file(f"zip://{file.name}!{shp}")
except DataSourceError as e: except Exception as e:
resource["error"] = str(e)
return False
except GEOSException as e:
resource["error"] = str(e) resource["error"] = str(e)
return False return False
return True return True
...@@ -12,21 +12,26 @@ def _load_into_file(url): ...@@ -12,21 +12,26 @@ def _load_into_file(url):
return temp_file return temp_file
def _is_capabilites_response(file): def _is_capabilites_response(resource, file):
with open(file.name, "rb") as f: with open(file.name, "rb") as f:
try: try:
xml = ET.parse(f).getroot() xml = ET.parse(f).getroot()
return ( if (
xml.tag == "{http://www.opengis.net/wfs/2.0}WFS_Capabilities" xml.tag == "{http://www.opengis.net/wfs/2.0}WFS_Capabilities"
or xml.tag == "{http://www.opengis.net/wfs}WFS_Capabilities" or xml.tag == "{http://www.opengis.net/wfs}WFS_Capabilities"
) ):
except ET.ParseError: return True
else:
resource["error"] = "Root element is not WFS_Capabilities"
return False
except Exception as e:
resource["error"] = str(e)
return False return False
def is_valid(resource, file): def is_valid(resource, file):
if _is_capabilites_response(file): if _is_capabilites_response(resource, file):
return True return True
# The response is not a capabilites XML files. That is allowed. # The response is not a capabilites XML files. That is allowed.
...@@ -38,7 +43,12 @@ def is_valid(resource, file): ...@@ -38,7 +43,12 @@ def is_valid(resource, file):
url = url + "?" url = url + "?"
url = url + "service=WFS&request=GetCapabilities" url = url + "service=WFS&request=GetCapabilities"
return _is_capabilites_response(_load_into_file(url))
try:
return _is_capabilites_response(resource, _load_into_file(url))
except Exception as e:
resource["error"] = str(e)
return False
else: else:
# The URL already contains a getCapabilites request but the result was not a correct answer. # The URL already contains a getCapabilites request but the result was not a correct answer.
return False return False