Newer
Older
from dateutil.parser import parse

anonymous
committed
import logging

Thorge Petersen
committed
from collections import OrderedDict
from ckan.types import Schema
from ckan.lib.plugins import DefaultTranslation
import ckan.plugins as p
import ckan.plugins.toolkit as tk
import ckanext.odsh.helpers as helpers_odsh
import ckanext.odsh.logic.action as action
import ckanext.odsh.validation as validation
import ckanext.odsh.search as search
import ckanext.odsh.tools as tools
from ckanext.odsh.views import default
# from ckanext.odsh.views import package
from ckanext.odsh.views import user
from ckanext.odsh.views import harvest
# from ckanext.dcat import blueprints as dcat_view
class OdshPlugin(p.SingletonPlugin, DefaultTranslation, tk.DefaultDatasetForm):
p.implements(p.IActions)
p.implements(p.IConfigurer)
p.implements(p.IDatasetForm)
p.implements(p.IFacets)
p.implements(p.IPackageController, inherit=True)
p.implements(p.ITemplateHelpers)
p.implements(p.ITranslation)
p.implements(p.IValidators)
p.implements(p.IResourceController, inherit=True)
p.implements(p.IBlueprint)
# IBlueprint
def get_blueprint(self):
# Default
bp_default = default.blueprint
rules = [
('/info_page', 'info_page', default.info_page),
('/home', 'start', default.start),
('/robots.txt', 'robots_txt', default.robots_txt),
('/not_found', 'not_found', default.not_found), ]
for rule in rules:
bp_default.add_url_rule(*rule)
# DCAT
# if tk.asbool(tk.config.get('ckanext.dcat.enable_rdf_endpoints', True)):
# helpers_odsh.odsh_remove_route(map, 'dcat_catalog')
# bp_default.add_url_rule('/catalog.<any("xml", "rdf", "n3", "ttl", "jsonld"):_format>', view_func=dcat_view.read_catalog, defaults={'_format': 'xml'}, methods=['GET'])
# Package
# bp_package = package.blueprint
# rules = [
# ('/dataset/<id>/resource/<resource_id>', 'redirect_dataset_resource', package.redirect_dataset_resource), ]
# for rule in rules:
# bp_package.add_url_rule(*rule)
# User
bp_user = user.blueprint
bp_user.add_url_rule(u'/user', endpoint='user_index',
view_func=user.index, strict_slashes=False)
# bp_user.add_url_rule(u'/user/register', view_func=user.register)
bp_harvest.add_url_rule(u'/harvest', view_func=harvest.search, strict_slashes=False)
bp_harvest.add_url_rule(u'/harvest/new', view_func=harvest.new)
bp_harvest.add_url_rule(u'/harvest/<id>', view_func=harvest.read)
bp_harvest.add_url_rule(u'/harvest/about/<id>', view_func=harvest.about)
# Feed
bp_feed = feed.blueprint
bp_feed.add_url_rule(u'/feeds/custom.atom', methods=[u'GET'], view_func=feed.custom)

Thorge Petersen
committed
return [bp_default, bp_user, bp_harvest, bp_feed] # bp_package
def get_actions(self):
return {'package_create': action.odsh_package_create,
'resource_create': action.odsh_resource_create, }
tk.add_template_directory(config_, 'templates')
tk.add_public_directory(config_, 'public')
tk.add_resource('assets', 'ckanext-odsh')
def package_types(self):
# This plugin doesn't handle any special package types, it just
# registers itself as the default (above).
return []
def is_fallback(self):
# Return True to register this plugin as the default handler for
# package types not handled by any other IDatasetForm plugin.
return True
def create_package_schema(self) -> Schema:
log.debug("OdshPlugin::create_package_schema")
schema: Schema = super(OdshPlugin, self).create_package_schema()
schema = self._update_schema(schema)
schema = self._update_create_or_update_package_schema(schema)
def update_package_schema(self) -> Schema:
log.debug("OdshPlugin::update_package_schema")
schema: Schema = super(OdshPlugin, self).update_package_schema()
schema = self._update_schema(schema)
schema = self._update_create_or_update_package_schema(schema)
def show_package_schema(self) -> Schema:
log.debug("OdshPlugin::show_package_schema")
schema: Schema = super(OdshPlugin, self).show_package_schema()
schema = self._update_show_package_schema(schema)

anonymous
committed
def _update_schema(self, schema: Schema) -> Schema:
schema.update({field: [tk.get_converter('not_empty')]})
for field in ['reference','applicableLegislation','hvdCategory', 'is_replaced_by', 'version_notes']:
schema.update({
field: [
tk.get_validator('ignore_missing'),
tk.get_converter('convert_to_extras')
'url': [tk.get_converter('not_empty')],
'format': [tk.get_converter('odsh_validate_format')],

anonymous
committed
tk.get_converter('known_spatial_uri'),
tk.get_converter('validate_licenseAttributionByText'),
{'__extras': [tk.get_converter('odsh_validate_extras')]})
return schema

anonymous
committed
def _update_create_or_update_package_schema(self, schema: Schema) -> Schema:
tk.get_validator('ignore_missing'),
tk.get_converter('convert_to_extras')
tk.get_validator('ignore_missing'),
tk.get_converter('convert_to_extras')
tk.get_validator('validate_relatedPackage'),
tk.get_converter('convert_to_extras')
tk.get_validator('ignore_missing'),
tk.get_converter('convert_to_extras')
],
'is_replaced_by': [
tk.get_validator('ignore_missing'),
tk.get_converter('convert_to_extras')
],
'version_notes': [
tk.get_validator('ignore_missing'),
tk.get_converter('convert_to_extras')
def _update_show_package_schema(self, schema: Schema) -> Schema:
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
],
'version_notes': [
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
tk.get_converter('convert_from_extras'),
tk.get_validator('ignore_missing')
return OrderedDict({'organization': _('Organizations'),
'groups': _('Category'),
'res_format': _('File format'),
'license_title': _('License'),
def group_facets(self, facets_dict, group_type, package_type):
return OrderedDict({'organization': _('Organizations'),
'res_format': _('File format'),
'license_title': _('License'),
'groups': _('Category')})
def organization_facets(self, facets_dict, organization_type, package_type):
return OrderedDict({'organization': _('Organizations'),
'res_format': _('File format'),
'license_title': _('License'),
'groups': _('Category')})

Thorge Petersen
committed
def after_dataset_show(self, context, pkg_dict):
'''
corrects missing relationships in pkg dict
adds the following key-value-pairs to pkg_dict:
# key: 'is_new', value: True if the dataset has been created within the last month
'''
pkg_dict = helpers_odsh.correct_missing_relationship(
helpers_odsh.get_pkg_relationships_from_model(pkg_dict)

Thorge Petersen
committed
def before_dataset_view(self, pkg_dict):
'''
adds the following key-value-pairs to pkg_dict:
# key: 'is_new', value: True if the dataset has been created within the last month
'''
self._update_is_new_in_pkg_dict(pkg_dict)
return pkg_dict

Thorge Petersen
committed
def after_dataset_create(self, context, resource):
if resource.get('package_id'):
tools.add_attributes_resources(context, resource)

Thorge Petersen
committed
def after_dataset_update(self, context, resource):
if resource.get('package_id'):
tools.add_attributes_resources(context, resource)
@staticmethod
def _update_is_new_in_pkg_dict(pkg_dict):
is_new = HelperPgkDict(pkg_dict).is_package_new()
pkg_dict.update({'is_new': is_new})

Thorge Petersen
committed
def before_dataset_index(self, dict_pkg):
# make special date fields solr conform
fields = ["issued", "temporal_start", "temporal_end"]
for field in fields:
field = 'extras_' + field
if field in dict_pkg and dict_pkg[field]:
d = parse(dict_pkg[field])
dict_pkg[field] = '{0.year:04d}-{0.month:02d}-{0.day:02d}T00:00:00Z'.format(
d)
if 'qa' in dict_pkg:
dict_pkg.pop('qa')
# Add the custom parameters to Solr's facet queries
# use several daterange queries agains temporal_start and temporal_end field
# TODO: use field of type date_range in solr index instead
def before_dataset_search(self, search_params):
return search.before_dataset_search(search_params)
def get_helpers(self):
# Template helper function names should begin with the name of the
# extension they belong to, to avoid clashing with functions from
# other extensions.
'odsh_main_groups': helpers_odsh.odsh_main_groups,
'odsh_now': helpers_odsh.odsh_now,
'odsh_group_id_selected': helpers_odsh.odsh_group_id_selected,
'odsh_get_facet_items_dict': helpers_odsh.odsh_get_facet_items_dict,
'odsh_openness_score_dataset_html': helpers_odsh.odsh_openness_score_dataset_html,
'odsh_get_resource_details': helpers_odsh.odsh_get_resource_details,
'odsh_get_resource_views': helpers_odsh.odsh_get_resource_views,
'odsh_get_bounding_box': helpers_odsh.odsh_get_bounding_box,
'odsh_get_spatial_text': helpers_odsh.odsh_get_spatial_text,
'odsh_render_datetime': helpers_odsh.odsh_render_datetime,
'odsh_resource_formats': helpers_odsh.odsh_resource_formats,
'odsh_encodeurl': helpers_odsh.odsh_encodeurl,
'odsh_extract_error': helpers_odsh.odsh_extract_error,
'odsh_extract_error_new': helpers_odsh.odsh_extract_error_new,
'odsh_extract_value_from_extras': helpers_odsh.odsh_extract_value_from_extras,

Thorge Petersen
committed
'extract_email': helpers_odsh.extract_email,
'odsh_create_checksum': helpers_odsh.odsh_create_checksum,
'presorted_license_options': helpers_odsh.presorted_license_options,
'odsh_has_more_facets': helpers_odsh.odsh_has_more_facets,
'odsh_public_url': helpers_odsh.odsh_public_url,
'odsh_spatial_extends_available': helpers_odsh.spatial_extends_available,
'odsh_public_resource_url': helpers_odsh.odsh_public_resource_url,
'odsh_show_testbanner': helpers_odsh.odsh_show_testbanner,
'get_daterange_prettified': helper_pkg_dict.get_daterange_prettified,
'get_language_of_package': helpers_odsh.get_language_of_package,
'get_language_icon': helpers_odsh.get_language_icon,
'short_name_for_category': helpers_odsh.short_name_for_category,
'get_spatial_for_selection': helpers_odsh.get_spatial_for_selection,
'get_language_for_selection': helpers_odsh.get_language_for_selection,
'get_resource_size': helpers_odsh.get_resource_size,
'get_address_org':helpers_odsh.get_address_org,
'get_body_mail':helpers_odsh.get_body_mail,

Thorge Petersen
committed
'odsh_load_applicable_legislations': helpers_odsh.odsh_load_applicable_legislations,
'odsh_load_hvd_categories': helpers_odsh.odsh_load_hvd_categories,
'odsh_load_mdk_sample_dataset': helpers_odsh.odsh_load_mdk_sample_dataset,
'odsh_load_raw_mdk_sample_dataset': helpers_odsh.odsh_load_raw_mdk_sample_dataset,
'format_resource_format': helpers_odsh.format_resource_format,
'odsh_matomo_enabled': helpers_odsh.odsh_show_testbanner,
'odsh_matomo_base_uri': helpers_odsh.odsh_matomo_base_uri,
'odsh_matomo_site_id': helpers_odsh.odsh_matomo_site_id,
scores = [['0OL'], ['0OL', '1RE'], ['0OL', '1RE', '2OF'], [
'0OL', '1RE', '2OF', '3URI'], ['0OL', '1RE', '2OF', '3URI', '4LD']]
def map_qa_score(self, dict_pkg):
if 'validated_data_dict' in dict_pkg and 'openness_score' in dict_pkg['validated_data_dict']:
d = json.loads(dict_pkg['validated_data_dict'])
score = -1
for r in d['resources']:
if ('qa' in r) and (type(r['qa']) is dict) and ('openness_score' in r['qa']):
s = int(r['qa']['openness_score'])
if s > score:
score = s
if score > 0:
dict_pkg['openness'] = OdshPlugin.scores[score-1]