Skip to content
Snippets Groups Projects
validation.py 10.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • anonymous's avatar
    anonymous committed
    # This Python file uses the following encoding: utf-8
    
    anonymous's avatar
    anonymous committed
    import logging
    
    import csv
    
    anonymous's avatar
    anonymous committed
    import re
    import json
    
    anonymous's avatar
    anonymous committed
    import ckan.plugins.toolkit as toolkit
    import ckan.model as model
    
    import pkg_resources
    from dateutil.parser import parse
    
    anonymous's avatar
    anonymous committed
    from ckan.lib.navl.dictization_functions import Missing
    
    from ckanext.odsh.helpers import get_package_dict
    
    from ckanext.odsh.helpers import odsh_resource_formats
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    _ = toolkit._
    
    
    anonymous's avatar
    anonymous committed
    log = logging.getLogger(__name__)
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def _extract_value(data, field):
        key = None
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        for k in list(data.keys()):
    
    anonymous's avatar
    anonymous committed
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        return data[(key[0], key[1], 'value')]
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def validate_extra_groups(data, requireAtLeastOne, errors):
    
        log.debug("Validating extra_groups")
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, 'groups')
    
    root's avatar
    root committed
        error_message_no_group = 'at least one group needed'
    
    anonymous's avatar
    anonymous committed
            # 'value != None' means the extra key 'groups' was found,
    
    anonymous's avatar
    anonymous committed
            # so the dataset came from manual editing via the web-frontend.
    
    anonymous's avatar
    anonymous committed
            if not value:
                if requireAtLeastOne:
    
    root's avatar
    root committed
                    errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
                data[('groups', 0, 'id')] = ''
                return
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
            groups = [g.strip() for g in value.split(',') if value.strip()]
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            for k in list(data.keys()):
    
    anonymous's avatar
    anonymous committed
                if len(k) == 3 and k[0] == 'groups':
    
    anonymous's avatar
    anonymous committed
                    data[k] = ''
    
    anonymous's avatar
    anonymous committed
                    # del data[k]
    
    anonymous's avatar
    anonymous committed
            if len(groups) == 0:
    
    anonymous's avatar
    anonymous committed
                if requireAtLeastOne:
    
    root's avatar
    root committed
                    errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
                return
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            for num, group in zip(list(range(len(groups))), groups):
    
    anonymous's avatar
    anonymous committed
                data[('groups', num, 'id')] = group
    
    anonymous's avatar
    anonymous committed
        else:  # no extra-field 'groups'
    
    anonymous's avatar
    anonymous committed
            # dataset might come from a harvest process
    
            if requireAtLeastOne and not data.get(('groups', 0, 'id'), False) and \
    
    anonymous's avatar
    anonymous committed
               not data.get(('groups', 0, 'name'), False):
    
    root's avatar
    root committed
                errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    
    def validate_extras(key, data, errors, context):
    
        log.debug("Validating extras")
    
    anonymous's avatar
    anonymous committed
        extra_errors = {}
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    
    
        isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        harvesting = ('ignore_auth' in context) and (
            context['ignore_auth'] == True)
    
        owner_org = data[('owner_org',)]
    
        lenient_with = toolkit.config.get('ckanext.odsh.lenient_with', '')
    
    root's avatar
    root committed
        is_optional_temporal_start = toolkit.asbool(
    
            toolkit.config.get('ckanext.odsh.is_optional_temporal_start', False)
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        ) or (harvesting and (owner_org in lenient_with))
    
    root's avatar
    root committed
    
        require_at_least_one_category = toolkit.asbool(
    
            toolkit.config.get('ckanext.odsh.require_at_least_one_category', False)
    
    root's avatar
    root committed
        )
        validate_extra_groups(
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            data=data,
            requireAtLeastOne=require_at_least_one_category,
    
    root's avatar
    root committed
            errors=extra_errors
        )
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    
    
    root's avatar
    root committed
        is_date_start_before_date_end(data, extra_errors)
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    
    
    root's avatar
    root committed
        validate_extra_date_new(
            key=key,
            field='issued',
            data=data,
            optional=isStaNord,
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_start',
            data=data,
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            optional=is_optional_temporal_start,
    
    root's avatar
    root committed
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_end',
            data=data,
            optional=True,
            errors=extra_errors
        )
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        if len(list(extra_errors.values())):
    
    anonymous's avatar
    anonymous committed
            raise toolkit.Invalid(extra_errors)
    
    anonymous's avatar
    anonymous committed
    
    
    root's avatar
    root committed
    def is_date_start_before_date_end(data, extra_errors):
        start_date = _extract_value(data, 'temporal_start')
        end_date = _extract_value(data, 'temporal_end')
        if start_date and end_date:
            if start_date > end_date:
                extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def _set_value(data, field, value):
        key = None
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        for k in list(data.keys()):
    
    anonymous's avatar
    anonymous committed
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        data[(key[0], key[1], 'value')] = value
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def validate_extra_date_new(key, field, data, optional, errors):
    
        log.debug("Validating extra_date_new")
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, field)
    
        if not value:
    
    anonymous's avatar
    anonymous committed
            if not optional:
                errors[field] = 'empty'
            return
    
    anonymous's avatar
    anonymous committed
        else:
            if re.match(r'\d\d\d\d-\d\d-\d\d', value):
                try:
    
    anonymous's avatar
    anonymous committed
                    dt = parse(value)
    
    anonymous's avatar
    anonymous committed
                    _set_value(data, field, dt.isoformat())
                    return
                except ValueError:
                    pass
    
    anonymous's avatar
    anonymous committed
            errors[field] = 'not a valid date'
    
    anonymous's avatar
    anonymous committed
    
    def validate_licenseAttributionByText(key, data, errors, context):
    
        log.debug("Validating licenseAttributionByText")
    
    anonymous's avatar
    anonymous committed
        register = model.Package.get_license_register()
    
    anonymous's avatar
    anonymous committed
        isByLicense = False
    
    anonymous's avatar
    anonymous committed
        for k in data:
            if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
    
    anonymous's avatar
    anonymous committed
                    'Namensnennung' in register[data[k]].title:
    
    anonymous's avatar
    anonymous committed
                isByLicense = True
                break
    
    anonymous's avatar
    anonymous committed
        hasAttribution = False
    
    anonymous's avatar
    anonymous committed
        for k in data:
            if data[k] == 'licenseAttributionByText':
    
    anonymous's avatar
    anonymous committed
                if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
    
    anonymous's avatar
    anonymous committed
                    del data[(k[0], k[1], 'value')]
                    del data[(k[0], k[1], 'key')]
                    break
                else:
                    value = data[(k[0], k[1], 'value')]
                    hasAttribution = value != ''
                    break
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            current_indexes = [k[1] for k in list(data.keys())
    
    anonymous's avatar
    anonymous committed
            new_index = max(current_indexes) + 1 if current_indexes else 0
    
            data[('extras', new_index, 'key')] = 'licenseAttributionByText'
            data[('extras', new_index, 'value')] = ''
    
    
    anonymous's avatar
    anonymous committed
        if isByLicense and not hasAttribution:
    
    anonymous's avatar
    anonymous committed
                'licenseAttributionByText: empty not allowed')
    
    anonymous's avatar
    anonymous committed
    
    def known_spatial_uri(key, data, errors, context):
    
    root's avatar
    root committed
        if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
            _copy_spatial_uri_temp_to_extras(data)
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, 'spatial_uri')
    
    root's avatar
    root committed
        require_spatial_uri = toolkit.asbool(
    
            toolkit.config.get('ckanext.odsh.require_spatial_uri', False)
    
    root's avatar
    root committed
        )
        error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
    
    anonymous's avatar
    anonymous committed
    
        if not value:
    
    anonymous's avatar
    anonymous committed
            poly = None
    
    
            # some harvesters might import a polygon directly...
            poly = _extract_value(data, 'spatial')
    
    anonymous's avatar
    anonymous committed
    
            has_old_uri = False
            pkg = context.get('package', None)
            if pkg:
                old_uri = pkg.extras.get('spatial_uri', None)
                has_old_uri = old_uri != None and len(old_uri) > 0
                if not poly:
                    poly = pkg.extras.get('spatial', None)
    
            if not poly and require_spatial_uri:
    
    root's avatar
    root committed
                raise toolkit.Invalid(error_message_spatial_uri_empty)
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            # if has_old_uri and require_spatial_uri:
    
            #    raise toolkit.Invalid(error_message_spatial_uri_empty)
    
    anonymous's avatar
    anonymous committed
                if poly:
                    new_index = next_extra_index(data)
                    data[('extras', new_index+1, 'key')] = 'spatial'
                    data[('extras', new_index+1, 'value')] = poly
    
    anonymous's avatar
    anonymous committed
                return
    
    
        extension_path = pkg_resources.resource_filename('ckanext.odsh', '')
    
        mapping_path = toolkit.config.get('ckanext.odsh.spatial.mapping',
                                          extension_path + '/resources/schleswig-holstein_geojson.csv')
    
    anonymous's avatar
    anonymous committed
    
        not_found = True
        spatial_text = str()
        spatial = str()
    
    
        try:
            with open(mapping_path, newline='') as mapping_file:
                cr = csv.reader(mapping_file, delimiter="\t")
                for row in cr:
                    if row and len(row) > 2 and row[0] == value:
                        not_found = False
                        spatial_text = row[1]
                        loaded = json.loads(row[2])
                        spatial = json.dumps(loaded.get('geometry', {}))
                        break
                if not_found:
                    raise toolkit.Invalid('spatial_uri: uri unknown')
        except (IOError, json.decoder.JSONDecodeError, KeyError) as e:
            log.error(f"Error processing spatial mapping: {e}")
            raise toolkit.Invalid("Error processing spatial mapping")
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
        new_index = next_extra_index(data)
    
    anonymous's avatar
    anonymous committed
    
        data[('extras', new_index, 'key')] = 'spatial_text'
        data[('extras', new_index, 'value')] = spatial_text
        data[('extras', new_index+1, 'key')] = 'spatial'
        data[('extras', new_index+1, 'value')] = spatial
    
    
    anonymous's avatar
    anonymous committed
    
    
    root's avatar
    root committed
    def _copy_spatial_uri_temp_to_extras(data):
        '''
    
        spatial_url_temp originating 
    
    root's avatar
    root committed
        from the user interface to extras
        '''
    
        extras_data = data.get(('__extras',))
    
        spatial_uri = extras_data.get(
            'spatial_uri_temp') or extras_data.get('spatial_url_temp')
    
    
        if _extract_value(data, 'spatial_uri') is None:
    
    root's avatar
    root committed
            next_index = next_extra_index(data)
            data[('extras', next_index, 'key')] = 'spatial_uri'
            data[('extras', next_index, 'value')] = spatial_uri
        else:
            _set_value(data, 'spatial_uri', spatial_uri)
    
    root's avatar
    root committed
    
    
    anonymous's avatar
    anonymous committed
    def next_extra_index(data):
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        current_indexes = [k[1] for k in list(data.keys())
    
    anonymous's avatar
    anonymous committed
                           if len(k) > 1 and k[0] == 'extras']
    
        return max(current_indexes) + 1 if current_indexes else 0
    
    
    
    def validate_relatedPackage(data):
        if data:
            try:
                get_package_dict(data)
            except logic.NotFound:
    
    Thorge Petersen's avatar
    Thorge Petersen committed
                raise toolkit.Invalid(
                    "relatedPackage: package '{}' not found".format(data))
    
    root's avatar
    root committed
    
    
    def validate_formats(data, errors):
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
        if not data:
            raise toolkit.Invalid('Missing format.')
    
    
        if not any(data.upper() == obj['key'] for obj in odsh_resource_formats()):
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            raise toolkit.Invalid(
                _('Only formats on the list of the EU Publications Office are allowed.'))
    
    def tag_name_validator(value, context):
    
        """Validate tag name to ensure it is non-empty and contains no line breaks.
           Replaces any newlines with spaces before validation.
    
        # Replace all newlines (\n, \r) with spaces
        value = re.sub(r'[\r\n]+', ' ', value).strip()
    
        # Ensure the tag is non-empty
        if not value:
            raise toolkit.Invalid(_('Invalid tag: Tags cannot be empty.'))
    
    
    anonymous's avatar
    anonymous committed
    def get_validators():
        return {
    
    anonymous's avatar
    anonymous committed
            'known_spatial_uri': known_spatial_uri,
            'odsh_validate_extras': validate_extras,
    
    root's avatar
    root committed
            'validate_licenseAttributionByText': validate_licenseAttributionByText,
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            'validate_relatedPackage': validate_relatedPackage,
    
            'odsh_validate_format': validate_formats,
    
            'tag_name_validator': tag_name_validator,
    
    anonymous's avatar
    anonymous committed
        }