Newer
Older

Thorge Petersen
committed
import ckan.logic as logic
from itertools import count
from dateutil.parser import parse
import ckan.plugins.toolkit as toolkit
import ckan.model as model
from ckan.lib.navl.dictization_functions import Missing
from ckanext.odsh.helpers_tpsh import get_package_dict
if data[k] == field:
key = k
break
if key is None:
return None
return data[(key[0], key[1], 'value')]
def validate_extra_groups(data, requireAtLeastOne, errors):
if value != None:
# 'value != None' means the extra key 'groups' was found,
groups = [g.strip() for g in value.split(',') if value.strip()]
for num, group in zip(list(range(len(groups))), groups):
if not data.get(('groups', 0, 'id'), False) and \
not data.get(('groups', 0, 'name'), False):
isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
harvesting = ('ignore_auth' in context) and (context['ignore_auth'] == True)
owner_org = data[('owner_org',)]
lenient_with = tk.config.get('ckanext.odsh.lenient_with','')
tk.config.get('ckanext.odsh.is_optional_temporal_start', False)
) or ( harvesting and (owner_org in lenient_with) )
tk.config.get('ckanext.odsh.require_at_least_one_category', False)
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
)
validate_extra_groups(
data=data,
requireAtLeastOne=require_at_least_one_category,
errors=extra_errors
)
is_date_start_before_date_end(data, extra_errors)
validate_extra_date_new(
key=key,
field='issued',
data=data,
optional=isStaNord,
errors=extra_errors
)
validate_extra_date_new(
key=key,
field='temporal_start',
data=data,
optional=is_optional_temporal_start,
errors=extra_errors
)
validate_extra_date_new(
key=key,
field='temporal_end',
data=data,
optional=True,
errors=extra_errors
)
def is_date_start_before_date_end(data, extra_errors):
start_date = _extract_value(data, 'temporal_start')
end_date = _extract_value(data, 'temporal_end')
if start_date and end_date:
if start_date > end_date:
extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
if data[k] == field:
key = k
break
if key is None:
return None
data[(key[0], key[1], 'value')] = value
def validate_extra_date_new(key, field, data, optional, errors):
value = _extract_value(data, field)
if not value:
if not optional:
errors[field] = 'empty'
return
else:
if re.match(r'\d\d\d\d-\d\d-\d\d', value):
try:
_set_value(data, field, dt.isoformat())
return
except ValueError:
pass
def validate_licenseAttributionByText(key, data, errors, context):
log.debug("Validating licenseAttributionByText")
for k in data:
if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
for k in data:
if data[k] == 'licenseAttributionByText':
if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
del data[(k[0], k[1], 'value')]
del data[(k[0], k[1], 'key')]
break
else:
value = data[(k[0], k[1], 'value')]
hasAttribution = value != ''
break

anonymous
committed
if not hasAttribution:
current_indexes = [k[1] for k in list(data.keys())

anonymous
committed
if len(k) > 1 and k[0] == 'extras']
new_index = max(current_indexes) + 1 if current_indexes else 0

anonymous
committed
data[('extras', new_index, 'key')] = 'licenseAttributionByText'
data[('extras', new_index, 'value')] = ''

anonymous
committed
raise toolkit.Invalid(

anonymous
committed

anonymous
committed
raise toolkit.Invalid(
'licenseAttributionByText: text not allowed for this license')
def known_spatial_uri(key, data, errors, context):
if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
_copy_spatial_uri_temp_to_extras(data)
tk.config.get('ckanext.odsh.require_spatial_uri', False)
)
error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
# some harvesters might import a polygon directly...
poly = _extract_value(data, 'spatial')
has_old_uri = False
pkg = context.get('package', None)
if pkg:
old_uri = pkg.extras.get('spatial_uri', None)
has_old_uri = old_uri != None and len(old_uri) > 0
if not poly:
poly = pkg.extras.get('spatial', None)
if (not poly) and require_spatial_uri:
raise toolkit.Invalid(error_message_spatial_uri_empty)
#if has_old_uri and require_spatial_uri:
# raise toolkit.Invalid(error_message_spatial_uri_empty)
if poly:
new_index = next_extra_index(data)
data[('extras', new_index+1, 'key')] = 'spatial'
data[('extras', new_index+1, 'value')] = poly
extension_path = pkg_resources.resource_filename('ckanext.odsh', '')
mapping_path = tk.config.get('ckanext.odsh.spatial.mapping', extension_path + '/resources/schleswig-holstein_geojson.csv')
not_found = True
spatial_text = str()
spatial = str()
with open(mapping_path, newline='') as mapping_file:
cr = csv.reader(mapping_file, delimiter="\t")
for row in cr:
not_found = False
spatial_text = row[1]
loaded = json.loads(row[2])
spatial = json.dumps(loaded['geometry'])
break
data[('extras', new_index, 'key')] = 'spatial_text'
data[('extras', new_index, 'value')] = spatial_text
data[('extras', new_index+1, 'key')] = 'spatial'
data[('extras', new_index+1, 'value')] = spatial
copy the field spatial_uri_temp or
spatial_url_temp originating
from the user interface to extras
'''
spatial_uri = data.get(('__extras',)).get('spatial_uri_temp')
if spatial_uri is None:
spatial_uri = data.get(('__extras',)).get('spatial_url_temp')
is_spatial_uri_in_extras = _extract_value(data, 'spatial_uri') is not None
if not is_spatial_uri_in_extras:
next_index = next_extra_index(data)
data[('extras', next_index, 'key')] = 'spatial_uri'
data[('extras', next_index, 'value')] = spatial_uri
else:
_set_value(data, 'spatial_uri', spatial_uri)
current_indexes = [k[1] for k in list(data.keys())
if len(k) > 1 and k[0] == 'extras']
return max(current_indexes) + 1 if current_indexes else 0
tagname_match = re.compile(r'[\w \-.\:\(\)\´\`\§]*$', re.UNICODE)
if not tagname_match.match(value):
raise toolkit.Invalid(_('Tag "%s" must be alphanumeric '
'characters or symbols: -_.:()') % (value))
return value
def tag_string_convert(key, data, errors, context):
'''Takes a list of tags that is a comma-separated string (in data[key])
and parses tag names. These are added to the data dict, enumerated. They
are also validated.'''
tags = [tag.strip()
for tag in data[key].split(',')
if tag.strip()]
else:
tags = data[key]
current_index = max([int(k[1]) for k in list(data.keys())
if len(k) == 3 and k[0] == 'tags'] + [-1])
for num, tag in zip(count(current_index+1), tags):
data[('tags', num, 'name')] = tag
for tag in tags:
toolkit.get_validator('tag_length_validator')(tag, context)
tag_name_validator(tag, context)
def _convert_subjectID_to_subjectText(subject_id, flattened_data):
if not subject_id:
return flattened_data
extension_path = pkg_resources.resource_filename('ckanext.odsh', '')
'ckanext.odsh.subject_mapping_file_path', extension_path + '/resources/subject_mapping.json')
try:
with open(subject_mapping_file_path) as mapping_json:
subject_mapping = json.loads(mapping_json.read())
except IOError as err:
log.error(
'Could not load subject mapping file from {}'
.format(subject_mapping_file_path)
)
raise
except ValueError as err:
log.error(
'Could not convert subject mapping file from json. \nSubject mapping file: {}'
.format(subject_mapping_file_path)
)
raise
try:
subject_text = subject_mapping[subject_id]
except:
log.warning(
'Subject_id "{}" not found in subject mapping dictionary.\nSubject mapping file: {}'
.format(subject_id, subject_mapping_file_path)
)

Thorge Petersen
committed
raise toolkit.Invalid(_('Subject must be a known URI.'))
new_index = next_extra_index(flattened_data)
flattened_data[('extras', new_index, 'key')] = 'subject_text'
flattened_data[('extras', new_index, 'value')] = subject_text
return flattened_data
def validate_subject(key, flattened_data, errors, context):
subject_id = flattened_data[key]
require_subject = toolkit.asbool(
tk.config.get('ckanext.odsh.require_subject', True)
)
if not require_subject:
flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
return
if not subject_id:
raise toolkit.Invalid(_('Subject must not be empty.'))
flattened_data = _convert_subjectID_to_subjectText(subject_id, flattened_data)
def validate_relatedPackage(data):
if data:
try:
get_package_dict(data)
except logic.NotFound:
raise toolkit.Invalid("relatedPackage: package '{}' not found".format(data))
def validate_formats(data, errors):
if not data:
raise toolkit.Invalid('Missing format.')
if not data.upper() in ['7Z','AAB','AAC','AKN4EU','AKN4EU_ZIP','APK','APPX','ARC','ARCINFO_COV','ARC_GZ','ARJ','ATOM','AZW','BIN','BITS','BMP','BWF','BZIP2','CSS','CSV','DAPK','DBF','DCR','DEB','DGN','DMG','DMP','DOC','DOCX','DTD_SGML','DTD_XML','DWG','DXF','E00','EAR','ECW','EFORMS_XML','EPS','EPUB','ETSI_XML','EXE','FMX2','FMX3','FMX4','FMX4_ZIP','GDB','GEOJSON','GEOTIFF','GIF','GML','GMZ','GPKG','GRID','GRID_ASCII','GTFS','GZIP','HDF','HDT','HTML','HTML5','HTML_SIMPL','ICS','IMMC_XML','INDD','IPA','ISO','ISO_ZIP','JAR','JATS','JPEG','JPEG2000','JS','JSON','JSON_LD','KML','KMZ','LAS','LAZ','LEG','LHA','LPK','LZIP','LZMA','LZO','MAP_PRVW','MAP_SRVC','MBOX','MDB','METS','METS_ZIP','MHTML','MIF_MID','MOBI','MOP','MP3','MPEG2','MPEG4','MPEG4_AVC','MRSID','MSG_HTTP','MSI','MXD','N3','NETCDF','OAPK','OCTET','ODB','ODC','ODF','ODG','ODP','ODS','ODT','OP_DATPRO','OVF','OWL','PDF','PDF1X','PDFA1A','PDFA1B','PDFA2A','PDFA2B','PDFA3','PDFUA','PDFX','PDFX1A','PDFX2A','PDFX4','PL','PNG','PPS','PPSX','PPT','PPTX','PS','PSD','PWP','QGS','RAR','RDF','RDFA','RDF_N_QUADS','RDF_N_TRIPLES','RDF_THRIFT','RDF_TRIG','RDF_TRIX','RDF_TURTLE','RDF_XML','REST','RPM','RSS','RTF','SB3','SCHEMA_XML','SDMX','SGML','SHP','SKOS_XML','SPARQLQ','SPARQLQRES','SQL','STL','SVG','SWM','TAB','TAB_RSTR','TAR','TAR_GZ','TAR_XZ','TIFF','TIFF_FX','TMX','TSV','TXT','UNGEN','WAR','WARC','WARC_GZ','WAV','WCS_SRVC','WEBP','WFS_SRVC','WIM','WMS_SRVC','WMTS_SRVC','WORLD','XHTML','XHTML5','XHTML_SIMPL','XLIFF','XLS','XLSB','XLSM','XLSX','XML','XSLFO','XSLT','XYZ','XZ','YAML','Z','ZIP']:
raise toolkit.Invalid(_('Only formats on the list of the EU Publications Office are allowed.'))
return data
'known_spatial_uri': known_spatial_uri,
'odsh_tag_name_validator': tag_name_validator,
'odsh_validate_extras': validate_extras,
'validate_licenseAttributionByText': validate_licenseAttributionByText,
'tpsh_validate_subject': validate_subject,
'tpsh_validate_relatedPackage': validate_relatedPackage,
'odsh_validate_format': validate_formats,