Newer
Older
import urllib.request
import urllib.error
import urllib.parse

Thorge Petersen
committed
import ckan.logic as logic
from itertools import count
from dateutil.parser import parse
import ckan.plugins.toolkit as toolkit
import ckan.model as model
from ckan.lib.navl.dictization_functions import Missing
from ckanext.odsh.helpers import get_package_dict

Thorge Petersen
committed
from ckanext.odsh.helpers import odsh_resource_formats
if data[k] == field:
key = k
break
if key is None:
return None
return data[(key[0], key[1], 'value')]
def validate_extra_groups(data, requireAtLeastOne, errors):
if value != None:
# 'value != None' means the extra key 'groups' was found,
groups = [g.strip() for g in value.split(',') if value.strip()]
for num, group in zip(list(range(len(groups))), groups):
if not data.get(('groups', 0, 'id'), False) and \
not data.get(('groups', 0, 'name'), False):
isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
harvesting = ('ignore_auth' in context) and (
context['ignore_auth'] == True)
owner_org = data[('owner_org',)]
lenient_with = tk.config.get('ckanext.odsh.lenient_with', '')
tk.config.get('ckanext.odsh.is_optional_temporal_start', False)
tk.config.get('ckanext.odsh.require_at_least_one_category', False)
data=data,
requireAtLeastOne=require_at_least_one_category,
validate_extra_date_new(
key=key,
field='issued',
data=data,
optional=isStaNord,
errors=extra_errors
)
validate_extra_date_new(
key=key,
field='temporal_start',
data=data,
errors=extra_errors
)
validate_extra_date_new(
key=key,
field='temporal_end',
data=data,
optional=True,
errors=extra_errors
)
def is_date_start_before_date_end(data, extra_errors):
start_date = _extract_value(data, 'temporal_start')
end_date = _extract_value(data, 'temporal_end')
if start_date and end_date:
if start_date > end_date:
extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
if data[k] == field:
key = k
break
if key is None:
return None
data[(key[0], key[1], 'value')] = value
def validate_extra_date_new(key, field, data, optional, errors):
value = _extract_value(data, field)
if not value:
if not optional:
errors[field] = 'empty'
return
else:
if re.match(r'\d\d\d\d-\d\d-\d\d', value):
try:
_set_value(data, field, dt.isoformat())
return
except ValueError:
pass
def validate_licenseAttributionByText(key, data, errors, context):
log.debug("Validating licenseAttributionByText")
for k in data:
if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
for k in data:
if data[k] == 'licenseAttributionByText':
if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
del data[(k[0], k[1], 'value')]
del data[(k[0], k[1], 'key')]
break
else:
value = data[(k[0], k[1], 'value')]
hasAttribution = value != ''
break

anonymous
committed
if not hasAttribution:
current_indexes = [k[1] for k in list(data.keys())

anonymous
committed
if len(k) > 1 and k[0] == 'extras']
new_index = max(current_indexes) + 1 if current_indexes else 0

anonymous
committed
data[('extras', new_index, 'key')] = 'licenseAttributionByText'
data[('extras', new_index, 'value')] = ''

anonymous
committed
raise toolkit.Invalid(

anonymous
committed
def known_spatial_uri(key, data, errors, context):
if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
_copy_spatial_uri_temp_to_extras(data)
tk.config.get('ckanext.odsh.require_spatial_uri', False)
)
error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
# some harvesters might import a polygon directly...
poly = _extract_value(data, 'spatial')
has_old_uri = False
pkg = context.get('package', None)
if pkg:
old_uri = pkg.extras.get('spatial_uri', None)
has_old_uri = old_uri != None and len(old_uri) > 0
if not poly:
poly = pkg.extras.get('spatial', None)
if (not poly) and require_spatial_uri:
raise toolkit.Invalid(error_message_spatial_uri_empty)
# raise toolkit.Invalid(error_message_spatial_uri_empty)
if poly:
new_index = next_extra_index(data)
data[('extras', new_index+1, 'key')] = 'spatial'
data[('extras', new_index+1, 'value')] = poly
extension_path = pkg_resources.resource_filename('ckanext.odsh', '')
mapping_path = tk.config.get('ckanext.odsh.spatial.mapping',
extension_path + '/resources/schleswig-holstein_geojson.csv')
not_found = True
spatial_text = str()
spatial = str()
try:
with open(mapping_path, newline='') as mapping_file:
cr = csv.reader(mapping_file, delimiter="\t")
for row in cr:
if row and len(row) > 2 and row[0] == value:
not_found = False
spatial_text = row[1]
loaded = json.loads(row[2])
spatial = json.dumps(loaded.get('geometry', {}))
break
if not_found:
raise toolkit.Invalid('spatial_uri: uri unknown')
except (IOError, json.decoder.JSONDecodeError, KeyError) as e:
log.error(f"Error processing spatial mapping: {e}")
raise toolkit.Invalid("Error processing spatial mapping")
data[('extras', new_index, 'key')] = 'spatial_text'
data[('extras', new_index, 'value')] = spatial_text
data[('extras', new_index+1, 'key')] = 'spatial'
data[('extras', new_index+1, 'value')] = spatial
copy the field spatial_uri_temp or
spatial_url_temp originating
from the user interface to extras
'''
spatial_uri = data.get(('__extras',)).get('spatial_uri_temp')
if spatial_uri is None:
spatial_uri = data.get(('__extras',)).get('spatial_url_temp')
is_spatial_uri_in_extras = _extract_value(data, 'spatial_uri') is not None
if not is_spatial_uri_in_extras:
next_index = next_extra_index(data)
data[('extras', next_index, 'key')] = 'spatial_uri'
data[('extras', next_index, 'value')] = spatial_uri
else:
_set_value(data, 'spatial_uri', spatial_uri)
current_indexes = [k[1] for k in list(data.keys())
if len(k) > 1 and k[0] == 'extras']
return max(current_indexes) + 1 if current_indexes else 0
def validate_relatedPackage(data):
if data:
try:
get_package_dict(data)
except logic.NotFound:
raise toolkit.Invalid(
"relatedPackage: package '{}' not found".format(data))
def validate_formats(data, errors):
if not data:
raise toolkit.Invalid('Missing format.')

Thorge Petersen
committed
if not any(data.upper() == obj['key'] for obj in odsh_resource_formats()):
raise toolkit.Invalid(
_('Only formats on the list of the EU Publications Office are allowed.'))
def tag_name_validator(value, context):
"""Allow tag name to contain any characters but no newlines
"""
tagname_match = re.compile(r'^(?=.*[^\n])[^\n]*$', re.UNICODE)
if not tagname_match.match(value):
raise toolkit.Invalid(
_('Invalid tag: "%s". Tags cannot contain line breaks.') % (value))
return value
'known_spatial_uri': known_spatial_uri,
'odsh_validate_extras': validate_extras,
'validate_licenseAttributionByText': validate_licenseAttributionByText,
'odsh_validate_format': validate_formats,
'tag_name_validator': tag_name_validator,