Skip to content
Snippets Groups Projects
validation.py 13.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • anonymous's avatar
    anonymous committed
    # This Python file uses the following encoding: utf-8
    
    anonymous's avatar
    anonymous committed
    import logging
    
    root's avatar
    root committed
    import unicodecsv as csv
    
    anonymous's avatar
    anonymous committed
    import re
    import urllib2
    import json
    from itertools import count
    from dateutil.parser import parse
    
    from pylons import config
    
    anonymous's avatar
    anonymous committed
    
    import ckan.plugins.toolkit as toolkit
    import ckan.model as model
    from ckan.lib.navl.dictization_functions import Missing
    
    
    from ckanext.odsh.helpers_tpsh import get_package_dict
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    _ = toolkit._
    
    
    anonymous's avatar
    anonymous committed
    log = logging.getLogger(__name__)
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def _extract_value(data, field):
        key = None
        for k in data.keys():
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        return data[(key[0], key[1], 'value')]
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def validate_extra_groups(data, requireAtLeastOne, errors):
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, 'groups')
    
    root's avatar
    root committed
        error_message_no_group = 'at least one group needed'
    
    anonymous's avatar
    anonymous committed
        if value != None:
            # 'value != None' means the extra key 'groups' was found,
    
    anonymous's avatar
    anonymous committed
            # so the dataset came from manual editing via the web-frontend.
    
    anonymous's avatar
    anonymous committed
            if not value:
                if requireAtLeastOne:
    
    root's avatar
    root committed
                    errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
                data[('groups', 0, 'id')] = ''
                return
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
            groups = [g.strip() for g in value.split(',') if value.strip()]
            for k in data.keys():
                if len(k) == 3 and k[0] == 'groups':
    
    anonymous's avatar
    anonymous committed
                    data[k] = ''
    
    anonymous's avatar
    anonymous committed
                    # del data[k]
    
    anonymous's avatar
    anonymous committed
            if len(groups) == 0:
    
    anonymous's avatar
    anonymous committed
                if requireAtLeastOne:
    
    root's avatar
    root committed
                    errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
                return
    
    anonymous's avatar
    anonymous committed
    
            for num, group in zip(range(len(groups)), groups):
                data[('groups', num, 'id')] = group
    
    anonymous's avatar
    anonymous committed
        else:  # no extra-field 'groups'
    
    anonymous's avatar
    anonymous committed
            # dataset might come from a harvest process
    
    anonymous's avatar
    anonymous committed
            if not data.get(('groups', 0, 'id'), False) and \
               not data.get(('groups', 0, 'name'), False):
    
    root's avatar
    root committed
                errors['groups'] = error_message_no_group
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    
    def validate_extras(key, data, errors, context):
    
    anonymous's avatar
    anonymous committed
        extra_errors = {}
    
    root's avatar
    root committed
        
    
    anonymous's avatar
    anonymous committed
        isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
    
    root's avatar
    root committed
        is_optional_temporal_start = toolkit.asbool(
            config.get('ckanext.odsh.is_optional_temporal_start', False)
        ) or isStaNord
    
        require_at_least_one_category = toolkit.asbool(
            config.get('ckanext.odsh.require_at_least_one_category', False)
        )
        validate_extra_groups(
            data=data, 
            requireAtLeastOne=require_at_least_one_category, 
            errors=extra_errors
        )
        
        is_date_start_before_date_end(data, extra_errors)
        
        validate_extra_date_new(
            key=key,
            field='issued',
            data=data,
            optional=isStaNord,
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_start',
            data=data,
            optional=is_optional_temporal_start, 
            errors=extra_errors
        )
        validate_extra_date_new(
            key=key,
            field='temporal_end',
            data=data,
            optional=True,
            errors=extra_errors
        )
    
    anonymous's avatar
    anonymous committed
        if len(extra_errors.values()):
    
    anonymous's avatar
    anonymous committed
            raise toolkit.Invalid(extra_errors)
    
    anonymous's avatar
    anonymous committed
    
    
    root's avatar
    root committed
    def is_date_start_before_date_end(data, extra_errors):
        start_date = _extract_value(data, 'temporal_start')
        end_date = _extract_value(data, 'temporal_end')
        if start_date and end_date:
            if start_date > end_date:
                extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def _set_value(data, field, value):
        key = None
        for k in data.keys():
            if data[k] == field:
                key = k
                break
        if key is None:
            return None
        data[(key[0], key[1], 'value')] = value
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def validate_extra_date_new(key, field, data, optional, errors):
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, field)
    
        if not value:
    
    anonymous's avatar
    anonymous committed
            if not optional:
                errors[field] = 'empty'
            return
    
    anonymous's avatar
    anonymous committed
        else:
            if re.match(r'\d\d\d\d-\d\d-\d\d', value):
                try:
    
    anonymous's avatar
    anonymous committed
                    dt = parse(value)
    
    anonymous's avatar
    anonymous committed
                    _set_value(data, field, dt.isoformat())
                    return
                except ValueError:
                    pass
    
    anonymous's avatar
    anonymous committed
            errors[field] = 'not a valid date'
    
    anonymous's avatar
    anonymous committed
    
    def validate_licenseAttributionByText(key, data, errors, context):
    
    anonymous's avatar
    anonymous committed
        register = model.Package.get_license_register()
    
    anonymous's avatar
    anonymous committed
        isByLicense = False
    
    anonymous's avatar
    anonymous committed
        for k in data:
            if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
    
    anonymous's avatar
    anonymous committed
                    'Namensnennung' in register[data[k]].title:
    
    anonymous's avatar
    anonymous committed
                isByLicense = True
                break
    
    anonymous's avatar
    anonymous committed
        hasAttribution = False
    
    anonymous's avatar
    anonymous committed
        for k in data:
            if data[k] == 'licenseAttributionByText':
    
    anonymous's avatar
    anonymous committed
                if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
    
    anonymous's avatar
    anonymous committed
                    del data[(k[0], k[1], 'value')]
                    del data[(k[0], k[1], 'key')]
                    break
                else:
                    value = data[(k[0], k[1], 'value')]
                    hasAttribution = value != ''
                    break
    
        if not hasAttribution:
            current_indexes = [k[1] for k in data.keys()
                               if len(k) > 1 and k[0] == 'extras']
    
    
    anonymous's avatar
    anonymous committed
            new_index = max(current_indexes) + 1 if current_indexes else 0
    
            data[('extras', new_index, 'key')] = 'licenseAttributionByText'
            data[('extras', new_index, 'value')] = ''
    
    
    anonymous's avatar
    anonymous committed
        if isByLicense and not hasAttribution:
    
    anonymous's avatar
    anonymous committed
                'licenseAttributionByText: empty not allowed')
    
    anonymous's avatar
    anonymous committed
        if not isByLicense and hasAttribution:
    
            raise toolkit.Invalid(
                'licenseAttributionByText: text not allowed for this license')
    
    
    anonymous's avatar
    anonymous committed
    
    def known_spatial_uri(key, data, errors, context):
    
    root's avatar
    root committed
        if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
            _copy_spatial_uri_temp_to_extras(data)
    
    anonymous's avatar
    anonymous committed
        value = _extract_value(data, 'spatial_uri')
    
    root's avatar
    root committed
        require_spatial_uri = toolkit.asbool(
            config.get('ckanext.odsh.require_spatial_uri', False)
        )
        error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
    
    anonymous's avatar
    anonymous committed
    
        if not value:
    
    anonymous's avatar
    anonymous committed
            poly = None
    
    
            # some harvesters might import a polygon directly...
            poly = _extract_value(data, 'spatial')
    
    anonymous's avatar
    anonymous committed
    
            has_old_uri = False
            pkg = context.get('package', None)
            if pkg:
                old_uri = pkg.extras.get('spatial_uri', None)
                has_old_uri = old_uri != None and len(old_uri) > 0
                if not poly:
                    poly = pkg.extras.get('spatial', None)
    
    root's avatar
    root committed
            if (not poly) and require_spatial_uri:
                raise toolkit.Invalid(error_message_spatial_uri_empty)
    
            #if has_old_uri and require_spatial_uri:
            #    raise toolkit.Invalid(error_message_spatial_uri_empty)
    
    anonymous's avatar
    anonymous committed
                if poly:
                    new_index = next_extra_index(data)
                    data[('extras', new_index+1, 'key')] = 'spatial'
                    data[('extras', new_index+1, 'value')] = poly
    
    anonymous's avatar
    anonymous committed
                return
    
    
    anonymous's avatar
    anonymous committed
        mapping_file = config.get('ckanext.odsh.spatial.mapping')
        try:
            mapping_file = urllib2.urlopen(mapping_file)
        except Exception:
            raise Exception("Could not load spatial mapping file!")
    
        not_found = True
        spatial_text = str()
        spatial = str()
    
    root's avatar
    root committed
        cr = csv.reader(mapping_file, delimiter="\t", encoding='utf-8')
    
    anonymous's avatar
    anonymous committed
        for row in cr:
    
    root's avatar
    root committed
            if row[0] == value:
    
    anonymous's avatar
    anonymous committed
                not_found = False
                spatial_text = row[1]
                loaded = json.loads(row[2])
                spatial = json.dumps(loaded['geometry'])
                break
        if not_found:
            raise toolkit.Invalid(
    
    anonymous's avatar
    anonymous committed
                'spatial_uri: uri unknown')
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
        new_index = next_extra_index(data)
    
    anonymous's avatar
    anonymous committed
    
        data[('extras', new_index, 'key')] = 'spatial_text'
        data[('extras', new_index, 'value')] = spatial_text
        data[('extras', new_index+1, 'key')] = 'spatial'
        data[('extras', new_index+1, 'value')] = spatial
    
    
    anonymous's avatar
    anonymous committed
    
    
    root's avatar
    root committed
    def _copy_spatial_uri_temp_to_extras(data):
        '''
    
        copy the field spatial_uri_temp or
        spatial_url_temp originating 
    
    root's avatar
    root committed
        from the user interface to extras
        '''
        spatial_uri = data.get(('__extras',)).get('spatial_uri_temp')
    
        if spatial_uri is None:
            spatial_uri = data.get(('__extras',)).get('spatial_url_temp')
    
    root's avatar
    root committed
        is_spatial_uri_in_extras = _extract_value(data, 'spatial_uri') is not None
        if not is_spatial_uri_in_extras:
            next_index = next_extra_index(data)
            data[('extras', next_index, 'key')] = 'spatial_uri'
            data[('extras', next_index, 'value')] = spatial_uri
        else:
            _set_value(data, 'spatial_uri', spatial_uri)
        
    
    
    anonymous's avatar
    anonymous committed
    def next_extra_index(data):
        current_indexes = [k[1] for k in data.keys()
                           if len(k) > 1 and k[0] == 'extras']
    
        return max(current_indexes) + 1 if current_indexes else 0
    
    
    
    anonymous's avatar
    anonymous committed
    def tag_name_validator(value, context):
    
    Thorge Petersen's avatar
    Thorge Petersen committed
        tagname_match = re.compile(r'[\w \-.\:\(\)\´\`\§]*$', re.UNICODE)
    
    anonymous's avatar
    anonymous committed
        if not tagname_match.match(value):
            raise toolkit.Invalid(_('Tag "%s" must be alphanumeric '
                                    'characters or symbols: -_.:()') % (value))
        return value
    
    
    anonymous's avatar
    anonymous committed
    
    
    anonymous's avatar
    anonymous committed
    def tag_string_convert(key, data, errors, context):
        '''Takes a list of tags that is a comma-separated string (in data[key])
        and parses tag names. These are added to the data dict, enumerated. They
        are also validated.'''
        if isinstance(data[key], basestring):
            tags = [tag.strip()
                    for tag in data[key].split(',')
                    if tag.strip()]
        else:
            tags = data[key]
    
        current_index = max([int(k[1]) for k in data.keys()
                             if len(k) == 3 and k[0] == 'tags'] + [-1])
    
        for num, tag in zip(count(current_index+1), tags):
            data[('tags', num, 'name')] = tag
    
        for tag in tags:
            toolkit.get_validator('tag_length_validator')(tag, context)
            tag_name_validator(tag, context)
    
    
    
    root's avatar
    root committed
    def _convert_subjectID_to_subjectText(subject_id, flattened_data):
    
        if not subject_id:
            return flattened_data
    
        default_subject_mapping_file_path = '/usr/lib/ckan/default/src/ckanext-odsh/subject_mapping.json'
        subject_mapping_file_path = config.get(
            'ckanext.odsh.subject_mapping_file_path', default_subject_mapping_file_path)
        
        try:
            with open(subject_mapping_file_path) as mapping_json:
                 subject_mapping = json.loads(mapping_json.read())
        except IOError as err:
            log.error(
                'Could not load subject mapping file from {}'
                .format(subject_mapping_file_path)
            )
            raise
        except ValueError as err:
            log.error(
                'Could not convert subject mapping file from json. \nSubject mapping file: {}'
                .format(subject_mapping_file_path)
            )
            raise
        
        try: 
            subject_text = subject_mapping[subject_id]
        except:
            raise toolkit.Invalid(_('Subject must be a known URI.'))
            log.warning(
                'Subject_id "{}" not found in subject mapping dictionary.\nSubject mapping file: {}'
                .format(subject_id, subject_mapping_file_path)
            )
            
    
        new_index = next_extra_index(flattened_data)
        flattened_data[('extras', new_index, 'key')] = 'subject_text'
        flattened_data[('extras', new_index, 'value')] = subject_text
        return flattened_data
    
    
    def validate_subject(key, flattened_data, errors, context):
        subject_id = flattened_data[key]
        require_subject = toolkit.asbool(
            config.get('ckanext.odsh.require_subject', True)
        )
        if not require_subject:
            flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
            return
        if not subject_id:
            raise toolkit.Invalid(_('Subject must not be empty.'))
        flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
    
    
    def validate_relatedPackage(data):
        if data:
            try:
                get_package_dict(data)
            except logic.NotFound:
                raise toolkit.Invalid("relatedPackage: package '{}' not found".format(data))
    
    root's avatar
    root committed
    
    
    def validate_formats(data, errors):
        if not data in ['7Z','AAB','AAC','AKN4EU','AKN4EU_ZIP','APK','APPX','ARC','ARC_GZ','ARCINFO_COV','ARJ','ATOM','AZW','BIN','BITS','BMP','BWF','BZIP2','CSS','CSV','DBF','DCR','DEB','DGN','DMG','DMP','DOC','DOCX','DTD_SGML','DTD_XML','DWG','DXF','E00','EAR','ECW','EPS','EPUB','ETSI_XML','EXE','FMX2','FMX3','FMX4','FMX4_ZIP','GDB','GEOJSON','GEOTIFF','GIF','GML','GMZ','GPKG','GRID','GRID_ASCII','GZIP','HDF','HDT','HTML','HTML5','HTML_SIMPL','ICS','IMMC_XML','INDD','IPA','ISO','ISO_ZIP','JAR','JATS','JPEG','JPEG2000','JS','JSON','JSON_LD','KML','KMZ','LAS','LAZ','LEG','LHA','LPK','LZIP','LZMA','LZO','MAP_PRVW','MAP_SRVC','MBOX','MDB','METS','METS_ZIP','MHTML','MIF_MID','MOBI','MOP','MPEG2','MPEG4','MPEG4_AVC','MRSID','MSG_HTTP','MSI','MXD','N3','NETCDF','OCTET','ODB','ODC','ODF','ODG','ODP','ODS','ODT','OP_DATPRO','OVF','OWL','PDF','PDF1X','PDFA1A','PDFA1B','PDFA2A','PDFA2B','PDFA3','PDFUA','PDFX','PDFX1A','PDFX2A','PDFX4','PL','PNG','PPS','PPSX','PPT','PPTX','PS','PSD','PWP','QGS','RAR','RDF','RDFA','RDF_N_QUADS','RDF_N_TRIPLES','RDF_TRIG','RDF_TRIX','RDF_TURTLE','RDF_XML','REST','RPM','RSS','RTF','SB3','SCHEMA_XML','SDMX','SGML','SHP','SKOS_XML','SPARQLQ','SPARQLQRES','SQL','STL','SVG','SWM','TAB','TAB_RSTR','TAR','TAR_GZ','TAR_XZ','TIFF','TIFF_FX','TMX','TSV','TXT','UNGEN','WAR','WARC','WARC_GZ','WCS_SRVC','WFS_SRVC','WIM','WMS_SRVC','WORLD','XHTML','XHTML_SIMPL','XLIFF','XLS','XLSB','XLSM','XLSX','XML','XSLFO','XSLT','XYZ','XZ','Z','ZIP']:
            raise toolkit.Invalid(_('Only formats on the list of the EU Publications Office are allowed.'))
    
        return data
    
    
    anonymous's avatar
    anonymous committed
    def get_validators():
        return {
    
    anonymous's avatar
    anonymous committed
            'known_spatial_uri': known_spatial_uri,
            'odsh_tag_name_validator': tag_name_validator,
            'odsh_validate_extras': validate_extras,
    
    root's avatar
    root committed
            'validate_licenseAttributionByText': validate_licenseAttributionByText,
            'tpsh_validate_subject': validate_subject,
    
    	'tpsh_validate_relatedPackage': validate_relatedPackage,
    
            'odsh_validate_format': validate_formats,
    
    anonymous's avatar
    anonymous committed
        }