Skip to content
Snippets Groups Projects
validation.py 12.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • Rainer Herzog's avatar
    Rainer Herzog committed
    # This Python file uses the following encoding: utf-8
    
    chbaeh's avatar
    chbaeh committed
    import logging
    
    import unicodecsv as csv
    
    chbaeh's avatar
    chbaeh committed
    import re
    import urllib2
    import json
    from itertools import count
    from dateutil.parser import parse
    
    from pylons import config
    
    chbaeh's avatar
    chbaeh committed
    
    import ckan.plugins.toolkit as toolkit
    import ckan.model as model
    
    import ckan.logic as logic
    
    chbaeh's avatar
    chbaeh committed
    from ckan.lib.navl.dictization_functions import Missing
    
    
    from ckanext.odsh.helpers_tpsh import get_package_dict
    
    chbaeh's avatar
    chbaeh committed
    
    
    Rainer Herzog's avatar
    Rainer Herzog committed
    _ = toolkit._
    
    
    log = logging.getLogger(__name__)
    
    
    chbaeh's avatar
    chbaeh committed
    
    
    chbaeh's avatar
    chbaeh committed
    def _extract_value(data, field):
        key = None
        for k in data.keys():
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        return data[(key[0], key[1], 'value')]
    
    
    chbaeh's avatar
    chbaeh committed
    
    
    ERROR_MSG_NO_GROUP = 'at least one group needed'
    
    
    chbaeh's avatar
    chbaeh committed
    def validate_extra_groups(data, requireAtLeastOne, errors):
    
        groups = _groups_from_data(data)
        if groups is not None:
    
            # groups are in extras
    
            if len(groups) == 0 and requireAtLeastOne:
                errors['groups'] = ERROR_MSG_NO_GROUP
            _clear_groups(data)
            _copy_groups_one_level_up(data, groups)
        else:  
            # no extra-field 'groups'
    
            if (not _at_least_one_group_outside_extras(data)) and requireAtLeastOne:
    
                errors['groups'] = ERROR_MSG_NO_GROUP
    
    chbaeh's avatar
    chbaeh committed
    
    
    def _groups_from_data(data):
        groups_csv = _extract_value(data, 'groups')
        try:
    
            groups = list(csv.reader([groups_csv]))[0]
    
        except TypeError:
            groups = None
        return groups
    
    def _clear_groups(data):
        for k in data.keys():
            if len(k) == 3 and k[0] == 'groups':
                data[k] = '' # dead code [?]
    
    def _copy_groups_one_level_up(data, groups):
        if len(groups) == 0:
            data[('groups', 0, 'id')] = ''
        else:
    
            for num, group in zip(range(len(groups)), groups):
                data[('groups', num, 'id')] = group
    
    def _at_least_one_group_outside_extras(data):
        return (
            ('groups', 0, 'id') in data or
            ('groups', 0, 'name') in data
        )
    
    
    chbaeh's avatar
    chbaeh committed
    def validate_extras(key, data, errors, context):
    
    chbaeh's avatar
    chbaeh committed
        extra_errors = {}
    
    chbaeh's avatar
    chbaeh committed
        isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
    
        is_optional_temporal_start = toolkit.asbool(
            config.get('ckanext.odsh.is_optional_temporal_start', False)
    
    chbaeh's avatar
    chbaeh committed
    
    
        require_at_least_one_category = toolkit.asbool(
            config.get('ckanext.odsh.require_at_least_one_category', False)
        )
        validate_extra_groups(
            data=data, 
            requireAtLeastOne=require_at_least_one_category, 
            errors=extra_errors
        )
        
    
        is_date_start_before_date_end(data, extra_errors)
        
    
        validate_extra_date_new(
            key=key,
            field='issued',
            data=data,
    
            optional=isStaNord,
    
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_start',
            data=data,
    
            optional=is_optional_temporal_start, 
    
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_end',
            data=data,
            optional=True,
            errors=extra_errors
        )
    
    chbaeh's avatar
    chbaeh committed
        if len(extra_errors.values()):
    
    chbaeh's avatar
    chbaeh committed
            raise toolkit.Invalid(extra_errors)
    
    chbaeh's avatar
    chbaeh committed
    
    
    def is_date_start_before_date_end(data, extra_errors):
        start_date = _extract_value(data, 'temporal_start')
        end_date = _extract_value(data, 'temporal_end')
        if start_date and end_date:
            if start_date > end_date:
                extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
    
    chbaeh's avatar
    chbaeh committed
    
    
    chbaeh's avatar
    chbaeh committed
    def _set_value(data, field, value):
        key = None
        for k in data.keys():
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        data[(key[0], key[1], 'value')] = value
    
    
    chbaeh's avatar
    chbaeh committed
    
    
    chbaeh's avatar
    chbaeh committed
    def validate_extra_date_new(key, field, data, optional, errors):
    
    chbaeh's avatar
    chbaeh committed
        value = _extract_value(data, field)
    
        if not value:
    
    chbaeh's avatar
    chbaeh committed
            if not optional:
                errors[field] = 'empty'
            return
    
    chbaeh's avatar
    chbaeh committed
        else:
            if re.match(r'\d\d\d\d-\d\d-\d\d', value):
                try:
    
    chbaeh's avatar
    chbaeh committed
                    dt = parse(value)
    
    chbaeh's avatar
    chbaeh committed
                    _set_value(data, field, dt.isoformat())
                    return
                except ValueError:
                    pass
    
    chbaeh's avatar
    chbaeh committed
            errors[field] = 'not a valid date'
    
    chbaeh's avatar
    chbaeh committed
    
    def validate_licenseAttributionByText(key, data, errors, context):
    
        require_license_attribution = toolkit.asbool(
            config.get('ckanext.odsh.require_license_attribution', True)
        )
    
        isByLicense = _isByLicense(data)
        hasAttribution = _hasAttribution(data)
        if not hasAttribution:
            _add_empty_attribution(data)
    
        if isByLicense and (not hasAttribution) and require_license_attribution:
            raise toolkit.Invalid(
                'licenseAttributionByText: empty not allowed')
    
    
    def _isByLicense(data):
    
    chbaeh's avatar
    chbaeh committed
        register = model.Package.get_license_register()
    
    chbaeh's avatar
    chbaeh committed
        isByLicense = False
    
    chbaeh's avatar
    chbaeh committed
        for k in data:
            if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
    
    chbaeh's avatar
    chbaeh committed
                    'Namensnennung' in register[data[k]].title:
    
    chbaeh's avatar
    chbaeh committed
                isByLicense = True
                break
    
        return isByLicense
    
    
    def _hasAttribution(data):
    
    chbaeh's avatar
    chbaeh committed
        hasAttribution = False
    
    chbaeh's avatar
    chbaeh committed
        for k in data:
            if data[k] == 'licenseAttributionByText':
    
    chbaeh's avatar
    chbaeh committed
                if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
    
    chbaeh's avatar
    chbaeh committed
                    del data[(k[0], k[1], 'value')]
                    del data[(k[0], k[1], 'key')]
                    break
                else:
                    value = data[(k[0], k[1], 'value')]
                    hasAttribution = value != ''
                    break
    
    def _add_empty_attribution(data):
        current_indexes = [
            k[1] for k in data.keys()
            if len(k) > 1 and k[0] == 'extras'
        ]
        new_index = max(current_indexes) + 1 if current_indexes else 0
        data[('extras', new_index, 'key')] = 'licenseAttributionByText'
        data[('extras', new_index, 'value')] = ''
    
    chbaeh's avatar
    chbaeh committed
    
    def known_spatial_uri(key, data, errors, context):
    
        if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
    
            _copy_spatial_uri_temp_to_extras(data)
    
        if data.get(('__extras',)) and 'spatial_url_temp' in data.get(('__extras',)):
            _copy_spatial_uri_temp_to_extras(data)
    
    chbaeh's avatar
    chbaeh committed
        value = _extract_value(data, 'spatial_uri')
    
        require_spatial_uri = toolkit.asbool(
            config.get('ckanext.odsh.require_spatial_uri', False)
        )
        error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
    
    chbaeh's avatar
    chbaeh committed
    
        if not value:
    
    chbaeh's avatar
    chbaeh committed
            poly = None
    
    
            # some harvesters might import a polygon directly...
            poly = _extract_value(data, 'spatial')
    
    chbaeh's avatar
    chbaeh committed
    
            has_old_uri = False
            pkg = context.get('package', None)
            if pkg:
                old_uri = pkg.extras.get('spatial_uri', None)
                has_old_uri = old_uri != None and len(old_uri) > 0
                if not poly:
                    poly = pkg.extras.get('spatial', None)
    
            if (not poly) and require_spatial_uri:
                raise toolkit.Invalid(error_message_spatial_uri_empty)
            if has_old_uri and require_spatial_uri:
                raise toolkit.Invalid(error_message_spatial_uri_empty)
    
    chbaeh's avatar
    chbaeh committed
                if poly:
                    new_index = next_extra_index(data)
                    data[('extras', new_index+1, 'key')] = 'spatial'
                    data[('extras', new_index+1, 'value')] = poly
    
    chbaeh's avatar
    chbaeh committed
                return
    
    
    chbaeh's avatar
    chbaeh committed
        mapping_file = config.get('ckanext.odsh.spatial.mapping')
        try:
            mapping_file = urllib2.urlopen(mapping_file)
        except Exception:
            raise Exception("Could not load spatial mapping file!")
    
        not_found = True
        spatial_text = str()
        spatial = str()
    
        cr = csv.reader(mapping_file, delimiter="\t", encoding='utf-8')
    
    chbaeh's avatar
    chbaeh committed
        for row in cr:
    
            if row[0] == value:
    
    chbaeh's avatar
    chbaeh committed
                not_found = False
                spatial_text = row[1]
                loaded = json.loads(row[2])
                spatial = json.dumps(loaded['geometry'])
                break
        if not_found:
            raise toolkit.Invalid(
    
    chbaeh's avatar
    chbaeh committed
                'spatial_uri: uri unknown')
    
    chbaeh's avatar
    chbaeh committed
    
    
    chbaeh's avatar
    chbaeh committed
        new_index = next_extra_index(data)
    
    chbaeh's avatar
    chbaeh committed
    
        data[('extras', new_index, 'key')] = 'spatial_text'
        data[('extras', new_index, 'value')] = spatial_text
        data[('extras', new_index+1, 'key')] = 'spatial'
        data[('extras', new_index+1, 'value')] = spatial
    
    
    chbaeh's avatar
    chbaeh committed
    
    
    def _copy_spatial_uri_temp_to_extras(data):
        '''
    
        copy the fields spatial_uri_temp or 
        spatial_url_temp originating 
    
        from the user interface to extras
        '''
        spatial_uri = data.get(('__extras',)).get('spatial_uri_temp')
    
        if spatial_uri is None:
            spatial_uri = data.get(('__extras',)).get('spatial_url_temp')
    
        is_spatial_uri_in_extras = _extract_value(data, 'spatial_uri') is not None
        if not is_spatial_uri_in_extras:
            next_index = next_extra_index(data)
            data[('extras', next_index, 'key')] = 'spatial_uri'
            data[('extras', next_index, 'value')] = spatial_uri
        else:
            _set_value(data, 'spatial_uri', spatial_uri)
        
    
    
    chbaeh's avatar
    chbaeh committed
    def next_extra_index(data):
        current_indexes = [k[1] for k in data.keys()
                           if len(k) > 1 and k[0] == 'extras']
    
        return max(current_indexes) + 1 if current_indexes else 0
    
    
    
    chbaeh's avatar
    chbaeh committed
    def tag_name_validator(value, context):
    
    Benjamin Becker's avatar
    Benjamin Becker committed
        tagname_match = re.compile(r'[\w \-.\:\(\)\´\`\§]*$', re.UNICODE)
    
    chbaeh's avatar
    chbaeh committed
        if not tagname_match.match(value):
            raise toolkit.Invalid(_('Tag "%s" must be alphanumeric '
                                    'characters or symbols: -_.:()') % (value))
        return value
    
    
    chbaeh's avatar
    chbaeh committed
    
    
    chbaeh's avatar
    chbaeh committed
    def tag_string_convert(key, data, errors, context):
        '''Takes a list of tags that is a comma-separated string (in data[key])
        and parses tag names. These are added to the data dict, enumerated. They
        are also validated.'''
        if isinstance(data[key], basestring):
            tags = [tag.strip()
                    for tag in data[key].split(',')
                    if tag.strip()]
        else:
            tags = data[key]
    
        current_index = max([int(k[1]) for k in data.keys()
                             if len(k) == 3 and k[0] == 'tags'] + [-1])
    
        for num, tag in zip(count(current_index+1), tags):
            data[('tags', num, 'name')] = tag
    
        for tag in tags:
            toolkit.get_validator('tag_length_validator')(tag, context)
            tag_name_validator(tag, context)
    
    
    
    def _convert_subjectID_to_subjectText(subject_id, flattened_data):
    
    Benjamin Becker's avatar
    Benjamin Becker committed
        default_subject_mapping_file_path = '/usr/lib/ckan/default/src/ckanext-odsh/subject_mapping.json'
        subject_mapping_file_path = config.get(
            'ckanext.odsh.subject_mapping_file_path', default_subject_mapping_file_path)
    
    Benjamin Becker's avatar
    Benjamin Becker committed
            with open(subject_mapping_file_path) as mapping_json:
    
                 subject_mapping = json.loads(mapping_json.read())
    
    Benjamin Becker's avatar
    Benjamin Becker committed
        except IOError as err:
            log.error(
                'Could not load subject mapping file from {}'
                .format(subject_mapping_file_path)
            )
            raise
        except ValueError as err:
            log.error(
                'Could not convert subject mapping file from json. \nSubject mapping file: {}'
                .format(subject_mapping_file_path)
            )
            raise
    
            subject_text = subject_mapping[subject_id]
    
        except:
            raise toolkit.Invalid(_('Subject must be a known URI.'))
    
    Benjamin Becker's avatar
    Benjamin Becker committed
            log.warning(
                'Subject_id "{}" not found in subject mapping dictionary.\nSubject mapping file: {}'
                .format(subject_id, subject_mapping_file_path)
            )
    
    Benjamin Becker's avatar
    Benjamin Becker committed
    
    
        new_index = next_extra_index(flattened_data)
        flattened_data[('extras', new_index, 'key')] = 'subject_text'
        flattened_data[('extras', new_index, 'value')] = subject_text
        return flattened_data
    
    def validate_subject(key, flattened_data, errors, context):
        subject_id = flattened_data[key]
    
        require_subject = toolkit.asbool(
            config.get('ckanext.odsh.require_subject', True)
        )
        if not require_subject:
    
            flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
            return
        if not subject_id:
    
            raise toolkit.Invalid(_('Subject must not be empty.'))
    
        flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
    
    def validate_relatedPackage(data):
        if data:
            try:
                get_package_dict(data)
            except logic.NotFound:
                raise toolkit.Invalid("relatedPackage: package '{}' not found".format(data))
    
    
    
    chbaeh's avatar
    chbaeh committed
    def get_validators():
        return {
    
    chbaeh's avatar
    chbaeh committed
            'known_spatial_uri': known_spatial_uri,
            'odsh_tag_name_validator': tag_name_validator,
            'odsh_validate_extras': validate_extras,
    
            'validate_licenseAttributionByText': validate_licenseAttributionByText,
            'tpsh_validate_subject': validate_subject,
    
            'tpsh_validate_relatedPackage': validate_relatedPackage,
    
    chbaeh's avatar
    chbaeh committed
        }