# This Python file uses the following encoding: utf-8 import logging import csv import re import json import ckan.logic as logic import ckan.plugins.toolkit as toolkit import ckan.model as model import pkg_resources from dateutil.parser import parse from ckan.lib.navl.dictization_functions import Missing from ckanext.odsh.helpers import get_package_dict from ckanext.odsh.helpers import odsh_resource_formats _ = toolkit._ log = logging.getLogger(__name__) def _extract_value(data, field): key = None for k in list(data.keys()): if data[k] == field: key = k break if key is None: return None return data[(key[0], key[1], 'value')] def validate_extra_groups(data, requireAtLeastOne, errors): log.debug("Validating extra_groups") value = _extract_value(data, 'groups') error_message_no_group = 'at least one group needed' if value is not None: # 'value != None' means the extra key 'groups' was found, # so the dataset came from manual editing via the web-frontend. if not value: if requireAtLeastOne: errors['groups'] = error_message_no_group data[('groups', 0, 'id')] = '' return groups = [g.strip() for g in value.split(',') if value.strip()] for k in list(data.keys()): if len(k) == 3 and k[0] == 'groups': data[k] = '' # del data[k] if len(groups) == 0: if requireAtLeastOne: errors['groups'] = error_message_no_group return for num, group in zip(list(range(len(groups))), groups): data[('groups', num, 'id')] = group else: # no extra-field 'groups' # dataset might come from a harvest process if requireAtLeastOne and not data.get(('groups', 0, 'id'), False) and \ not data.get(('groups', 0, 'name'), False): errors['groups'] = error_message_no_group def validate_extras(key, data, errors, context): log.debug("Validating extras") extra_errors = {} isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord' harvesting = ('ignore_auth' in context) and ( context['ignore_auth'] == True) owner_org = data[('owner_org',)] lenient_with = toolkit.config.get('ckanext.odsh.lenient_with', '') is_optional_temporal_start = toolkit.asbool( toolkit.config.get('ckanext.odsh.is_optional_temporal_start', False) ) or (harvesting and (owner_org in lenient_with)) require_at_least_one_category = toolkit.asbool( toolkit.config.get('ckanext.odsh.require_at_least_one_category', False) ) validate_extra_groups( data=data, requireAtLeastOne=require_at_least_one_category, errors=extra_errors ) is_date_start_before_date_end(data, extra_errors) validate_extra_date_new( key=key, field='issued', data=data, optional=isStaNord, errors=extra_errors ) validate_extra_date_new( key=key, field='temporal_start', data=data, optional=is_optional_temporal_start, errors=extra_errors ) validate_extra_date_new( key=key, field='temporal_end', data=data, optional=True, errors=extra_errors ) if len(list(extra_errors.values())): raise toolkit.Invalid(extra_errors) def is_date_start_before_date_end(data, extra_errors): start_date = _extract_value(data, 'temporal_start') end_date = _extract_value(data, 'temporal_end') if start_date and end_date: if start_date > end_date: extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.' def _set_value(data, field, value): key = None for k in list(data.keys()): if data[k] == field: key = k break if key is None: return None data[(key[0], key[1], 'value')] = value def validate_extra_date_new(key, field, data, optional, errors): log.debug("Validating extra_date_new") value = _extract_value(data, field) if not value: if not optional: errors[field] = 'empty' return else: if re.match(r'\d\d\d\d-\d\d-\d\d', value): try: dt = parse(value) _set_value(data, field, dt.isoformat()) return except ValueError: pass errors[field] = 'not a valid date' def validate_licenseAttributionByText(key, data, errors, context): log.debug("Validating licenseAttributionByText") register = model.Package.get_license_register() isByLicense = False for k in data: if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \ 'Namensnennung' in register[data[k]].title: isByLicense = True break hasAttribution = False for k in data: if data[k] == 'licenseAttributionByText': if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data: del data[(k[0], k[1], 'value')] del data[(k[0], k[1], 'key')] break else: value = data[(k[0], k[1], 'value')] hasAttribution = value != '' break if not hasAttribution: current_indexes = [k[1] for k in list(data.keys()) if len(k) > 1 and k[0] == 'extras'] new_index = max(current_indexes) + 1 if current_indexes else 0 data[('extras', new_index, 'key')] = 'licenseAttributionByText' data[('extras', new_index, 'value')] = '' if isByLicense and not hasAttribution: raise toolkit.Invalid( 'licenseAttributionByText: empty not allowed') def known_spatial_uri(key, data, errors, context): if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)): _copy_spatial_uri_temp_to_extras(data) value = _extract_value(data, 'spatial_uri') require_spatial_uri = toolkit.asbool( toolkit.config.get('ckanext.odsh.require_spatial_uri', False) ) error_message_spatial_uri_empty = 'spatial_uri: empty not allowed' if not value: poly = None # some harvesters might import a polygon directly... poly = _extract_value(data, 'spatial') has_old_uri = False pkg = context.get('package', None) if pkg: old_uri = pkg.extras.get('spatial_uri', None) has_old_uri = old_uri != None and len(old_uri) > 0 if not poly: poly = pkg.extras.get('spatial', None) if not poly and require_spatial_uri: raise toolkit.Invalid(error_message_spatial_uri_empty) # if has_old_uri and require_spatial_uri: # raise toolkit.Invalid(error_message_spatial_uri_empty) else: if poly: new_index = next_extra_index(data) data[('extras', new_index+1, 'key')] = 'spatial' data[('extras', new_index+1, 'value')] = poly return extension_path = pkg_resources.resource_filename('ckanext.odsh', '') mapping_path = toolkit.config.get('ckanext.odsh.spatial.mapping', extension_path + '/resources/schleswig-holstein_geojson.csv') not_found = True spatial_text = str() spatial = str() try: with open(mapping_path, newline='') as mapping_file: cr = csv.reader(mapping_file, delimiter="\t") for row in cr: if row and len(row) > 2 and row[0] == value: not_found = False spatial_text = row[1] loaded = json.loads(row[2]) spatial = json.dumps(loaded.get('geometry', {})) break if not_found: raise toolkit.Invalid('spatial_uri: uri unknown') except (IOError, json.decoder.JSONDecodeError, KeyError) as e: log.error(f"Error processing spatial mapping: {e}") raise toolkit.Invalid("Error processing spatial mapping") new_index = next_extra_index(data) data[('extras', new_index, 'key')] = 'spatial_text' data[('extras', new_index, 'value')] = spatial_text data[('extras', new_index+1, 'key')] = 'spatial' data[('extras', new_index+1, 'value')] = spatial def _copy_spatial_uri_temp_to_extras(data): ''' Copy the field spatial_uri_temp or spatial_url_temp originating from the user interface to extras ''' extras_data = data.get(('__extras',)) spatial_uri = extras_data.get( 'spatial_uri_temp') or extras_data.get('spatial_url_temp') if _extract_value(data, 'spatial_uri') is None: next_index = next_extra_index(data) data[('extras', next_index, 'key')] = 'spatial_uri' data[('extras', next_index, 'value')] = spatial_uri else: _set_value(data, 'spatial_uri', spatial_uri) def next_extra_index(data): current_indexes = [k[1] for k in list(data.keys()) if len(k) > 1 and k[0] == 'extras'] return max(current_indexes) + 1 if current_indexes else 0 def validate_relatedPackage(data): if data: try: get_package_dict(data) except logic.NotFound: raise toolkit.Invalid( "relatedPackage: package '{}' not found".format(data)) def validate_formats(data, errors): if not data: raise toolkit.Invalid('Missing format.') if not any(data.upper() == obj['key'] for obj in odsh_resource_formats()): raise toolkit.Invalid( _('Only formats on the list of the EU Publications Office are allowed.')) return data def tag_name_validator(value, context): """Validate tag name to ensure it is non-empty and contains no line breaks. Replaces any newlines with spaces before validation. """ # Replace all newlines (\n, \r) with spaces value = re.sub(r'[\r\n]+', ' ', value).strip() # Ensure the tag is non-empty if not value: raise toolkit.Invalid(_('Invalid tag: Tags cannot be empty.')) return value def get_validators(): return { 'known_spatial_uri': known_spatial_uri, 'odsh_validate_extras': validate_extras, 'validate_licenseAttributionByText': validate_licenseAttributionByText, 'validate_relatedPackage': validate_relatedPackage, 'odsh_validate_format': validate_formats, 'tag_name_validator': tag_name_validator, }