Skip to content
Snippets Groups Projects
Select Git revision
  • 381f48bf487b4240a37909c30a6ef9034b0630de
  • master default protected
  • add-frequency-to-form
  • dev protected
  • ckan-2.11.0
  • add-package-custom-fields
  • fix-adding-datasets-for-users-and-editors
  • add-auth-subroute
  • 71-migrate-custom-fields-to-ckanext-scheming
  • add-author-maintainer-information
  • fix-inline-flex-btns
  • fix-known-spatial-uri-validation
  • py3
  • 47-aktuelle-resource-einer-collection-wird-nicht-mehr-gefunden
  • 10-eingabe-der-dct-accrualperiodicity-in-weboberflache
  • v1.3
  • 2.5.3
  • 2.5.2
  • 2.5.1
  • 2.5.0
  • 2.4.7
  • 2.4.6
  • 2.4.5
  • 2.4.4
  • 2.4.3
  • 2.4.2
  • 2.4.1
  • 2.4.0
  • 2.3.1
  • 2.3.0
  • 2.2.0
  • 2.1.0
  • 2.0.0
  • 1.4.3
  • 1.4.2
  • 1.4.1
36 results

validation.py

Blame
  • validation.py 12.23 KiB
    # This Python file uses the following encoding: utf-8
    import logging
    import unicodecsv as csv
    import re
    import urllib2
    import json
    from itertools import count
    from dateutil.parser import parse
    
    import ckan.plugins.toolkit as toolkit
    import ckan.model as model
    from ckan.lib.navl.dictization_functions import Missing
    
    from pylons import config
    
    import pdb
    
    _ = toolkit._
    
    log = logging.getLogger(__name__)
    
    
    def _extract_value(data, field):
        key = None
        for k in data.keys():
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        return data[(key[0], key[1], 'value')]
    
    
    ERROR_MSG_NO_GROUP = 'at least one group needed'
    
    def validate_extra_groups(data, requireAtLeastOne, errors):
        groups = _groups_from_data(data)
        if groups is not None:
            # groups are in extras
            if len(groups) == 0 and requireAtLeastOne:
                errors['groups'] = ERROR_MSG_NO_GROUP
            _clear_groups(data)
            _copy_groups_one_level_up(data, groups)
        else:  
            # no extra-field 'groups'
            if (not _at_least_one_group_outside_extras(data)) and requireAtLeastOne:
                errors['groups'] = ERROR_MSG_NO_GROUP
    
    def _groups_from_data(data):
        groups_csv = _extract_value(data, 'groups')
        try:
            groups = list(csv.reader([groups_csv]))[0]
        except TypeError:
            groups = None
        return groups
    
    def _clear_groups(data):
        for k in data.keys():
            if len(k) == 3 and k[0] == 'groups':
                data[k] = '' # dead code [?]
    
    def _copy_groups_one_level_up(data, groups):
        if len(groups) == 0:
            data[('groups', 0, 'id')] = ''
        else:
            for num, group in zip(range(len(groups)), groups):
                data[('groups', num, 'id')] = group
    
    def _at_least_one_group_outside_extras(data):
        return (
            ('groups', 0, 'id') in data or
            ('groups', 0, 'name') in data
        )
    
    
    def validate_extras(key, data, errors, context):
        extra_errors = {}
        
        isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
        is_optional_temporal_start = toolkit.asbool(
            config.get('ckanext.odsh.is_optional_temporal_start', False)
        ) or isStaNord
    
        require_at_least_one_category = toolkit.asbool(
            config.get('ckanext.odsh.require_at_least_one_category', False)
        )
        validate_extra_groups(
            data=data, 
            requireAtLeastOne=require_at_least_one_category, 
            errors=extra_errors
        )
        
        is_date_start_before_date_end(data, extra_errors)
        
        validate_extra_date_new(
            key=key,
            field='issued',
            data=data,
            optional=isStaNord,
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_start',
            data=data,
            optional=is_optional_temporal_start, 
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_end',
            data=data,
            optional=True,
            errors=extra_errors
        )
    
        if len(extra_errors.values()):
            raise toolkit.Invalid(extra_errors)
    
    def is_date_start_before_date_end(data, extra_errors):
        start_date = _extract_value(data, 'temporal_start')
        end_date = _extract_value(data, 'temporal_end')
        if start_date and end_date:
            if start_date > end_date:
                extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
    
    def _set_value(data, field, value):
        key = None
        for k in data.keys():
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        data[(key[0], key[1], 'value')] = value
    
    
    def validate_extra_date_new(key, field, data, optional, errors):
        value = _extract_value(data, field)
    
        if not value:
            if not optional:
                errors[field] = 'empty'
            return
        else:
            if re.match(r'\d\d\d\d-\d\d-\d\d', value):
                try:
                    dt = parse(value)
                    _set_value(data, field, dt.isoformat())
                    return
                except ValueError:
                    pass
            errors[field] = 'not a valid date'
    
    
    def validate_licenseAttributionByText(key, data, errors, context):
        require_license_attribution = toolkit.asbool(
            config.get('ckanext.odsh.require_license_attribution', True)
        )
        isByLicense = _isByLicense(data)
        hasAttribution = _hasAttribution(data)
        if not hasAttribution:
            _add_empty_attribution(data)
    
        if isByLicense and (not hasAttribution) and require_license_attribution:
            raise toolkit.Invalid(
                'licenseAttributionByText: empty not allowed')
    
        if (not isByLicense) and hasAttribution:
            raise toolkit.Invalid(
                'licenseAttributionByText: text not allowed for this license')
    
    
    def _isByLicense(data):
        register = model.Package.get_license_register()
        isByLicense = False
        for k in data:
            if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
                    'Namensnennung' in register[data[k]].title:
                isByLicense = True
                break
        return isByLicense
    
    
    def _hasAttribution(data):
        hasAttribution = False
        for k in data:
            if data[k] == 'licenseAttributionByText':
                if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
                    del data[(k[0], k[1], 'value')]
                    del data[(k[0], k[1], 'key')]
                    break
                else:
                    value = data[(k[0], k[1], 'value')]
                    hasAttribution = value != ''
                    break
        return hasAttribution
    
    
    def _add_empty_attribution(data):
        current_indexes = [
            k[1] for k in data.keys()
            if len(k) > 1 and k[0] == 'extras'
        ]
        new_index = max(current_indexes) + 1 if current_indexes else 0
        data[('extras', new_index, 'key')] = 'licenseAttributionByText'
        data[('extras', new_index, 'value')] = ''
    
    
    def known_spatial_uri(key, data, errors, context):
        if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
            _copy_spatial_uri_temp_to_extras(data)
        if data.get(('__extras',)) and 'spatial_url_temp' in data.get(('__extras',)):
            _copy_spatial_uri_temp_to_extras(data)
        value = _extract_value(data, 'spatial_uri')
        require_spatial_uri = toolkit.asbool(
            config.get('ckanext.odsh.require_spatial_uri', False)
        )
        error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
    
        if not value:
            poly = None
    
            # some harvesters might import a polygon directly...
            # pdb.set_trace()
            poly = _extract_value(data, 'spatial')
    
            has_old_uri = False
            pkg = context.get('package', None)
            if pkg:
                old_uri = pkg.extras.get('spatial_uri', None)
                has_old_uri = old_uri != None and len(old_uri) > 0
                if not poly:
                    poly = pkg.extras.get('spatial', None)
            if (not poly) and require_spatial_uri:
                raise toolkit.Invalid(error_message_spatial_uri_empty)
            if has_old_uri and require_spatial_uri:
                raise toolkit.Invalid(error_message_spatial_uri_empty)
            else:
                if poly:
                    new_index = next_extra_index(data)
                    data[('extras', new_index+1, 'key')] = 'spatial'
                    data[('extras', new_index+1, 'value')] = poly
                return
    
        mapping_file = config.get('ckanext.odsh.spatial.mapping')
        try:
            mapping_file = urllib2.urlopen(mapping_file)
        except Exception:
            raise Exception("Could not load spatial mapping file!")
    
        not_found = True
        spatial_text = str()
        spatial = str()
        cr = csv.reader(mapping_file, delimiter="\t", encoding='utf-8')
        for row in cr:
            if row[0] == value:
                not_found = False
                spatial_text = row[1]
                loaded = json.loads(row[2])
                spatial = json.dumps(loaded['geometry'])
                break
        if not_found:
            raise toolkit.Invalid(
                'spatial_uri: uri unknown')
    
        new_index = next_extra_index(data)
    
        data[('extras', new_index, 'key')] = 'spatial_text'
        data[('extras', new_index, 'value')] = spatial_text
        data[('extras', new_index+1, 'key')] = 'spatial'
        data[('extras', new_index+1, 'value')] = spatial
    
    
    def _copy_spatial_uri_temp_to_extras(data):
        '''
        copy the fields spatial_uri_temp or 
        spatial_url_temp originating 
        from the user interface to extras
        '''
        spatial_uri = data.get(('__extras',)).get('spatial_uri_temp')
        if spatial_uri is None:
            spatial_uri = data.get(('__extras',)).get('spatial_url_temp')
        is_spatial_uri_in_extras = _extract_value(data, 'spatial_uri') is not None
        if not is_spatial_uri_in_extras:
            next_index = next_extra_index(data)
            data[('extras', next_index, 'key')] = 'spatial_uri'
            data[('extras', next_index, 'value')] = spatial_uri
        else:
            _set_value(data, 'spatial_uri', spatial_uri)
        
    
    def next_extra_index(data):
        current_indexes = [k[1] for k in data.keys()
                           if len(k) > 1 and k[0] == 'extras']
    
        return max(current_indexes) + 1 if current_indexes else 0
    
    
    def tag_name_validator(value, context):
        tagname_match = re.compile('[\w \-.\:\(\)\´\`]*$', re.UNICODE)
        if not tagname_match.match(value):
            raise toolkit.Invalid(_('Tag "%s" must be alphanumeric '
                                    'characters or symbols: -_.:()') % (value))
        return value
    
    
    def tag_string_convert(key, data, errors, context):
        '''Takes a list of tags that is a comma-separated string (in data[key])
        and parses tag names. These are added to the data dict, enumerated. They
        are also validated.'''
        if isinstance(data[key], basestring):
            tags = [tag.strip()
                    for tag in data[key].split(',')
                    if tag.strip()]
        else:
            tags = data[key]
    
        current_index = max([int(k[1]) for k in data.keys()
                             if len(k) == 3 and k[0] == 'tags'] + [-1])
    
        for num, tag in zip(count(current_index+1), tags):
            data[('tags', num, 'name')] = tag
    
        for tag in tags:
            toolkit.get_validator('tag_length_validator')(tag, context)
            tag_name_validator(tag, context)
    
    
    def _convert_subjectID_to_subjectText(subject_id, flattened_data):
        default_subject_mapping_file_path = '/usr/lib/ckan/default/src/ckanext-odsh/subject_mapping.json'
        subject_mapping_file_path = config.get(
            'ckanext.odsh.subject_mapping_file_path', default_subject_mapping_file_path)
        
        try:
            with open(subject_mapping_file_path) as mapping_json:
                 subject_mapping = json.loads(mapping_json.read())
        except IOError as err:
            log.error(
                'Could not load subject mapping file from {}'
                .format(subject_mapping_file_path)
            )
            raise
        except ValueError as err:
            log.error(
                'Could not convert subject mapping file from json. \nSubject mapping file: {}'
                .format(subject_mapping_file_path)
            )
            raise
        
        try: 
            subject_text = subject_mapping[subject_id]
        except:
            raise toolkit.Invalid(_('Subject must be a known URI.'))
            log.warning(
                'Subject_id "{}" not found in subject mapping dictionary.\nSubject mapping file: {}'
                .format(subject_id, subject_mapping_file_path)
            )
            
    
        new_index = next_extra_index(flattened_data)
        flattened_data[('extras', new_index, 'key')] = 'subject_text'
        flattened_data[('extras', new_index, 'value')] = subject_text
        return flattened_data
    
    
    def validate_subject(key, flattened_data, errors, context):
        subject_id = flattened_data[key]
        require_subject = toolkit.asbool(
            config.get('ckanext.odsh.require_subject', True)
        )
        if not require_subject:
            flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
            return
        if not subject_id:
            raise toolkit.Invalid(_('Subject must not be empty.'))
        flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
    
    
    def get_validators():
        return {
            'known_spatial_uri': known_spatial_uri,
            'odsh_tag_name_validator': tag_name_validator,
            'odsh_validate_extras': validate_extras,
            'validate_licenseAttributionByText': validate_licenseAttributionByText,
            'tpsh_validate_subject': validate_subject,
        }