Skip to content
Snippets Groups Projects
validation.py 10.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • anonymous's avatar
    anonymous committed
    # This Python file uses the following encoding: utf-8
    
    anonymous's avatar
    anonymous committed
    import logging
    
    import csv
    
    anonymous's avatar
    anonymous committed
    import re
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    import urllib.request
    import urllib.error
    import urllib.parse
    
    anonymous's avatar
    anonymous committed
    import json
    
    anonymous's avatar
    anonymous committed
    from itertools import count
    from dateutil.parser import parse
    
    import ckan.plugins.toolkit as toolkit
    import ckan.model as model
    from ckan.lib.navl.dictization_functions import Missing
    
    
    from ckanext.odsh.helpers import get_package_dict
    
    from ckanext.odsh.helpers import odsh_resource_formats
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    import ckan.plugins.toolkit as tk
    
    import pkg_resources
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    _ = toolkit._
    
    
    anonymous's avatar
    anonymous committed
    log = logging.getLogger(__name__)
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def _extract_value(data, field):
        key = None
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        for k in list(data.keys()):
    
    anonymous's avatar
    anonymous committed
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        return data[(key[0], key[1], 'value')]
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def validate_extra_groups(data, requireAtLeastOne, errors):
    
        log.debug("Validating extra_groups")
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, 'groups')
    
    root's avatar
    root committed
        error_message_no_group = 'at least one group needed'
    
    anonymous's avatar
    anonymous committed
        if value != None:
            # 'value != None' means the extra key 'groups' was found,
    
    anonymous's avatar
    anonymous committed
            # so the dataset came from manual editing via the web-frontend.
    
    anonymous's avatar
    anonymous committed
            if not value:
                if requireAtLeastOne:
    
    root's avatar
    root committed
                    errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
                data[('groups', 0, 'id')] = ''
                return
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
            groups = [g.strip() for g in value.split(',') if value.strip()]
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            for k in list(data.keys()):
    
    anonymous's avatar
    anonymous committed
                if len(k) == 3 and k[0] == 'groups':
    
    anonymous's avatar
    anonymous committed
                    data[k] = ''
    
    anonymous's avatar
    anonymous committed
                    # del data[k]
    
    anonymous's avatar
    anonymous committed
            if len(groups) == 0:
    
    anonymous's avatar
    anonymous committed
                if requireAtLeastOne:
    
    root's avatar
    root committed
                    errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
                return
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            for num, group in zip(list(range(len(groups))), groups):
    
    anonymous's avatar
    anonymous committed
                data[('groups', num, 'id')] = group
    
    anonymous's avatar
    anonymous committed
        else:  # no extra-field 'groups'
    
    anonymous's avatar
    anonymous committed
            # dataset might come from a harvest process
    
    anonymous's avatar
    anonymous committed
            if not data.get(('groups', 0, 'id'), False) and \
               not data.get(('groups', 0, 'name'), False):
    
    root's avatar
    root committed
                errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    
    def validate_extras(key, data, errors, context):
    
        log.debug("Validating extras")
    
    anonymous's avatar
    anonymous committed
        extra_errors = {}
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    
    
        isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        harvesting = ('ignore_auth' in context) and (
            context['ignore_auth'] == True)
    
        owner_org = data[('owner_org',)]
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        lenient_with = tk.config.get('ckanext.odsh.lenient_with', '')
    
    root's avatar
    root committed
        is_optional_temporal_start = toolkit.asbool(
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            tk.config.get('ckanext.odsh.is_optional_temporal_start', False)
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        ) or (harvesting and (owner_org in lenient_with))
    
    root's avatar
    root committed
    
        require_at_least_one_category = toolkit.asbool(
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            tk.config.get('ckanext.odsh.require_at_least_one_category', False)
    
    root's avatar
    root committed
        )
        validate_extra_groups(
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            data=data,
            requireAtLeastOne=require_at_least_one_category,
    
    root's avatar
    root committed
            errors=extra_errors
        )
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    
    
    root's avatar
    root committed
        is_date_start_before_date_end(data, extra_errors)
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    
    
    root's avatar
    root committed
        validate_extra_date_new(
            key=key,
            field='issued',
            data=data,
            optional=isStaNord,
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_start',
            data=data,
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            optional=is_optional_temporal_start,
    
    root's avatar
    root committed
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_end',
            data=data,
            optional=True,
            errors=extra_errors
        )
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        if len(list(extra_errors.values())):
    
    anonymous's avatar
    anonymous committed
            raise toolkit.Invalid(extra_errors)
    
    anonymous's avatar
    anonymous committed
    
    
    root's avatar
    root committed
    def is_date_start_before_date_end(data, extra_errors):
        start_date = _extract_value(data, 'temporal_start')
        end_date = _extract_value(data, 'temporal_end')
        if start_date and end_date:
            if start_date > end_date:
                extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def _set_value(data, field, value):
        key = None
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        for k in list(data.keys()):
    
    anonymous's avatar
    anonymous committed
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        data[(key[0], key[1], 'value')] = value
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def validate_extra_date_new(key, field, data, optional, errors):
    
        log.debug("Validating extra_date_new")
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, field)
    
        if not value:
    
    anonymous's avatar
    anonymous committed
            if not optional:
                errors[field] = 'empty'
            return
    
    anonymous's avatar
    anonymous committed
        else:
            if re.match(r'\d\d\d\d-\d\d-\d\d', value):
                try:
    
    anonymous's avatar
    anonymous committed
                    dt = parse(value)
    
    anonymous's avatar
    anonymous committed
                    _set_value(data, field, dt.isoformat())
                    return
                except ValueError:
                    pass
    
    anonymous's avatar
    anonymous committed
            errors[field] = 'not a valid date'
    
    anonymous's avatar
    anonymous committed
    
    def validate_licenseAttributionByText(key, data, errors, context):
    
        log.debug("Validating licenseAttributionByText")
    
    anonymous's avatar
    anonymous committed
        register = model.Package.get_license_register()
    
    anonymous's avatar
    anonymous committed
        isByLicense = False
    
    anonymous's avatar
    anonymous committed
        for k in data:
            if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
    
    anonymous's avatar
    anonymous committed
                    'Namensnennung' in register[data[k]].title:
    
    anonymous's avatar
    anonymous committed
                isByLicense = True
                break
    
    anonymous's avatar
    anonymous committed
        hasAttribution = False
    
    anonymous's avatar
    anonymous committed
        for k in data:
            if data[k] == 'licenseAttributionByText':
    
    anonymous's avatar
    anonymous committed
                if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
    
    anonymous's avatar
    anonymous committed
                    del data[(k[0], k[1], 'value')]
                    del data[(k[0], k[1], 'key')]
                    break
                else:
                    value = data[(k[0], k[1], 'value')]
                    hasAttribution = value != ''
                    break
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            current_indexes = [k[1] for k in list(data.keys())
    
    anonymous's avatar
    anonymous committed
            new_index = max(current_indexes) + 1 if current_indexes else 0
    
            data[('extras', new_index, 'key')] = 'licenseAttributionByText'
            data[('extras', new_index, 'value')] = ''
    
    
    anonymous's avatar
    anonymous committed
        if isByLicense and not hasAttribution:
    
    anonymous's avatar
    anonymous committed
                'licenseAttributionByText: empty not allowed')
    
    anonymous's avatar
    anonymous committed
    
    def known_spatial_uri(key, data, errors, context):
    
    root's avatar
    root committed
        if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
            _copy_spatial_uri_temp_to_extras(data)
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, 'spatial_uri')
    
    root's avatar
    root committed
        require_spatial_uri = toolkit.asbool(
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            tk.config.get('ckanext.odsh.require_spatial_uri', False)
    
    root's avatar
    root committed
        )
        error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
    
    anonymous's avatar
    anonymous committed
    
        if not value:
    
    anonymous's avatar
    anonymous committed
            poly = None
    
    
            # some harvesters might import a polygon directly...
            poly = _extract_value(data, 'spatial')
    
    anonymous's avatar
    anonymous committed
    
            has_old_uri = False
            pkg = context.get('package', None)
            if pkg:
                old_uri = pkg.extras.get('spatial_uri', None)
                has_old_uri = old_uri != None and len(old_uri) > 0
                if not poly:
                    poly = pkg.extras.get('spatial', None)
    
    root's avatar
    root committed
            if (not poly) and require_spatial_uri:
                raise toolkit.Invalid(error_message_spatial_uri_empty)
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            # if has_old_uri and require_spatial_uri:
    
            #    raise toolkit.Invalid(error_message_spatial_uri_empty)
    
    anonymous's avatar
    anonymous committed
                if poly:
                    new_index = next_extra_index(data)
                    data[('extras', new_index+1, 'key')] = 'spatial'
                    data[('extras', new_index+1, 'value')] = poly
    
    anonymous's avatar
    anonymous committed
                return
    
    
        extension_path = pkg_resources.resource_filename('ckanext.odsh', '')
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        mapping_path = tk.config.get('ckanext.odsh.spatial.mapping',
                                     extension_path + '/resources/schleswig-holstein_geojson.csv')
    
    anonymous's avatar
    anonymous committed
    
        not_found = True
        spatial_text = str()
        spatial = str()
    
    
        try:
            with open(mapping_path, newline='') as mapping_file:
                cr = csv.reader(mapping_file, delimiter="\t")
                for row in cr:
                    if row and len(row) > 2 and row[0] == value:
                        not_found = False
                        spatial_text = row[1]
                        loaded = json.loads(row[2])
                        spatial = json.dumps(loaded.get('geometry', {}))
                        break
                if not_found:
                    raise toolkit.Invalid('spatial_uri: uri unknown')
        except (IOError, json.decoder.JSONDecodeError, KeyError) as e:
            log.error(f"Error processing spatial mapping: {e}")
            raise toolkit.Invalid("Error processing spatial mapping")
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
        new_index = next_extra_index(data)
    
    anonymous's avatar
    anonymous committed
    
        data[('extras', new_index, 'key')] = 'spatial_text'
        data[('extras', new_index, 'value')] = spatial_text
        data[('extras', new_index+1, 'key')] = 'spatial'
        data[('extras', new_index+1, 'value')] = spatial
    
    
    anonymous's avatar
    anonymous committed
    
    
    root's avatar
    root committed
    def _copy_spatial_uri_temp_to_extras(data):
        '''
    
        copy the field spatial_uri_temp or
        spatial_url_temp originating 
    
    root's avatar
    root committed
        from the user interface to extras
        '''
        spatial_uri = data.get(('__extras',)).get('spatial_uri_temp')
    
        if spatial_uri is None:
            spatial_uri = data.get(('__extras',)).get('spatial_url_temp')
    
    root's avatar
    root committed
        is_spatial_uri_in_extras = _extract_value(data, 'spatial_uri') is not None
        if not is_spatial_uri_in_extras:
            next_index = next_extra_index(data)
            data[('extras', next_index, 'key')] = 'spatial_uri'
            data[('extras', next_index, 'value')] = spatial_uri
        else:
            _set_value(data, 'spatial_uri', spatial_uri)
    
    root's avatar
    root committed
    
    
    anonymous's avatar
    anonymous committed
    def next_extra_index(data):
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        current_indexes = [k[1] for k in list(data.keys())
    
    anonymous's avatar
    anonymous committed
                           if len(k) > 1 and k[0] == 'extras']
    
        return max(current_indexes) + 1 if current_indexes else 0
    
    
    
    def validate_relatedPackage(data):
        if data:
            try:
                get_package_dict(data)
            except logic.NotFound:
    
    Thorge Petersen's avatar
    Thorge Petersen committed
                raise toolkit.Invalid(
                    "relatedPackage: package '{}' not found".format(data))
    
    root's avatar
    root committed
    
    
    def validate_formats(data, errors):
    
    Jesper Zedlitz's avatar
    Jesper Zedlitz committed
        if not data:
            raise toolkit.Invalid('Missing format.')
    
    
        if not any(data.upper() == obj['key'] for obj in odsh_resource_formats()):
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            raise toolkit.Invalid(
                _('Only formats on the list of the EU Publications Office are allowed.'))
    
    def tag_name_validator(value, context):
        """Allow tag name to contain any characters but no newlines
        """
    
        tagname_match = re.compile(r'^(?=.*[^\n])[^\n]*$', re.UNICODE)
    
        if not tagname_match.match(value):
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            raise toolkit.Invalid(
                _('Invalid tag: "%s". Tags cannot contain line breaks.') % (value))
    
    anonymous's avatar
    anonymous committed
    def get_validators():
        return {
    
    anonymous's avatar
    anonymous committed
            'known_spatial_uri': known_spatial_uri,
            'odsh_validate_extras': validate_extras,
    
    root's avatar
    root committed
            'validate_licenseAttributionByText': validate_licenseAttributionByText,
    
    Thorge Petersen's avatar
    Thorge Petersen committed
            'validate_relatedPackage': validate_relatedPackage,
    
            'odsh_validate_format': validate_formats,
    
            'tag_name_validator': tag_name_validator,
    
    anonymous's avatar
    anonymous committed
        }