Newer
Older

Thorge Petersen
committed
import ckan.logic as logic
import ckan.plugins.toolkit as toolkit
import ckan.model as model
import pkg_resources
from dateutil.parser import parse
from ckan.lib.navl.dictization_functions import Missing
from ckanext.odsh.helpers import get_package_dict

Thorge Petersen
committed
from ckanext.odsh.helpers import odsh_resource_formats
if data[k] == field:
key = k
break
if key is None:
return None
return data[(key[0], key[1], 'value')]
def validate_extra_groups(data, requireAtLeastOne, errors):
if value != None:
# 'value != None' means the extra key 'groups' was found,
groups = [g.strip() for g in value.split(',') if value.strip()]
for num, group in zip(list(range(len(groups))), groups):
if not data.get(('groups', 0, 'id'), False) and \
not data.get(('groups', 0, 'name'), False):
isStaNord = ('id',) in data and data[('id',)][:7] == 'StaNord'
harvesting = ('ignore_auth' in context) and (
context['ignore_auth'] == True)
owner_org = data[('owner_org',)]
lenient_with = toolkit.config.get('ckanext.odsh.lenient_with', '')
toolkit.config.get('ckanext.odsh.is_optional_temporal_start', False)
toolkit.config.get('ckanext.odsh.require_at_least_one_category', False)
data=data,
requireAtLeastOne=require_at_least_one_category,
validate_extra_date_new(
key=key,
field='issued',
data=data,
optional=isStaNord,
errors=extra_errors
)
validate_extra_date_new(
key=key,
field='temporal_start',
data=data,
errors=extra_errors
)
validate_extra_date_new(
key=key,
field='temporal_end',
data=data,
optional=True,
errors=extra_errors
)
def is_date_start_before_date_end(data, extra_errors):
start_date = _extract_value(data, 'temporal_start')
end_date = _extract_value(data, 'temporal_end')
if start_date and end_date:
if start_date > end_date:
extra_errors['temporal_start'] = extra_errors['temporal_end'] = 'Please enter a valid period of time.'
if data[k] == field:
key = k
break
if key is None:
return None
data[(key[0], key[1], 'value')] = value
def validate_extra_date_new(key, field, data, optional, errors):
value = _extract_value(data, field)
if not value:
if not optional:
errors[field] = 'empty'
return
else:
if re.match(r'\d\d\d\d-\d\d-\d\d', value):
try:
_set_value(data, field, dt.isoformat())
return
except ValueError:
pass
def validate_licenseAttributionByText(key, data, errors, context):
log.debug("Validating licenseAttributionByText")
for k in data:
if len(k) > 0 and k[0] == 'license_id' and data[k] and not isinstance(data[k], Missing) and \
for k in data:
if data[k] == 'licenseAttributionByText':
if isinstance(data[(k[0], k[1], 'value')], Missing) or (k[0], k[1], 'value') not in data:
del data[(k[0], k[1], 'value')]
del data[(k[0], k[1], 'key')]
break
else:
value = data[(k[0], k[1], 'value')]
hasAttribution = value != ''
break

anonymous
committed
if not hasAttribution:
current_indexes = [k[1] for k in list(data.keys())

anonymous
committed
if len(k) > 1 and k[0] == 'extras']
new_index = max(current_indexes) + 1 if current_indexes else 0

anonymous
committed
data[('extras', new_index, 'key')] = 'licenseAttributionByText'
data[('extras', new_index, 'value')] = ''

anonymous
committed
raise toolkit.Invalid(

anonymous
committed
def known_spatial_uri(key, data, errors, context):
if data.get(('__extras',)) and 'spatial_uri_temp' in data.get(('__extras',)):
_copy_spatial_uri_temp_to_extras(data)
toolkit.config.get('ckanext.odsh.require_spatial_uri', False)
)
error_message_spatial_uri_empty = 'spatial_uri: empty not allowed'
# some harvesters might import a polygon directly...
poly = _extract_value(data, 'spatial')
has_old_uri = False
pkg = context.get('package', None)
if pkg:
old_uri = pkg.extras.get('spatial_uri', None)
has_old_uri = old_uri != None and len(old_uri) > 0
if not poly:
poly = pkg.extras.get('spatial', None)
if not poly and require_spatial_uri:
# raise toolkit.Invalid(error_message_spatial_uri_empty)
if poly:
new_index = next_extra_index(data)
data[('extras', new_index+1, 'key')] = 'spatial'
data[('extras', new_index+1, 'value')] = poly
extension_path = pkg_resources.resource_filename('ckanext.odsh', '')
mapping_path = toolkit.config.get('ckanext.odsh.spatial.mapping',
extension_path + '/resources/schleswig-holstein_geojson.csv')
not_found = True
spatial_text = str()
spatial = str()
try:
with open(mapping_path, newline='') as mapping_file:
cr = csv.reader(mapping_file, delimiter="\t")
for row in cr:
if row and len(row) > 2 and row[0] == value:
not_found = False
spatial_text = row[1]
loaded = json.loads(row[2])
spatial = json.dumps(loaded.get('geometry', {}))
break
if not_found:
raise toolkit.Invalid('spatial_uri: uri unknown')
except (IOError, json.decoder.JSONDecodeError, KeyError) as e:
log.error(f"Error processing spatial mapping: {e}")
raise toolkit.Invalid("Error processing spatial mapping")
data[('extras', new_index, 'key')] = 'spatial_text'
data[('extras', new_index, 'value')] = spatial_text
data[('extras', new_index+1, 'key')] = 'spatial'
data[('extras', new_index+1, 'value')] = spatial
Copy the field spatial_uri_temp or
extras_data = data.get(('__extras',))
spatial_uri = extras_data.get(
'spatial_uri_temp') or extras_data.get('spatial_url_temp')
if _extract_value(data, 'spatial_uri') is None:
next_index = next_extra_index(data)
data[('extras', next_index, 'key')] = 'spatial_uri'
data[('extras', next_index, 'value')] = spatial_uri
else:
_set_value(data, 'spatial_uri', spatial_uri)
current_indexes = [k[1] for k in list(data.keys())
if len(k) > 1 and k[0] == 'extras']
return max(current_indexes) + 1 if current_indexes else 0
def validate_relatedPackage(data):
if data:
try:
get_package_dict(data)
except logic.NotFound:
raise toolkit.Invalid(
"relatedPackage: package '{}' not found".format(data))
def validate_formats(data, errors):
if not data:
raise toolkit.Invalid('Missing format.')

Thorge Petersen
committed
if not any(data.upper() == obj['key'] for obj in odsh_resource_formats()):
raise toolkit.Invalid(
_('Only formats on the list of the EU Publications Office are allowed.'))
def tag_name_validator(value, context):
"""Validate tag name to ensure it is non-empty and contains no line breaks.
Replaces any newlines with spaces before validation.
# Replace all newlines (\n, \r) with spaces
value = re.sub(r'[\r\n]+', ' ', value).strip()
# Ensure the tag is non-empty
if not value:
raise toolkit.Invalid(_('Invalid tag: Tags cannot be empty.'))
return value
'known_spatial_uri': known_spatial_uri,
'odsh_validate_extras': validate_extras,
'validate_licenseAttributionByText': validate_licenseAttributionByText,
'odsh_validate_format': validate_formats,
'tag_name_validator': tag_name_validator,