# import from third parties from dateutil.parser import parse import json import logging from collections import OrderedDict from ckan.types import Schema # imports from ckan from ckan.lib.plugins import DefaultTranslation import ckan.plugins as p import ckan.plugins.toolkit as tk # imports from this extension import ckanext.odsh.helpers as helpers_odsh import ckanext.odsh.helper_pkg_dict as helper_pkg_dict from .helper_pkg_dict import HelperPgkDict import ckanext.odsh.logic.action as action import ckanext.odsh.validation as validation import ckanext.odsh.search as search import ckanext.odsh.tools as tools from ckanext.odsh.views import default # from ckanext.odsh.views import package from ckanext.odsh.views import user from ckanext.odsh.views import harvest from ckanext.odsh.views import feed # from ckanext.dcat import blueprints as dcat_view log = logging.getLogger(__name__) _ = tk._ class OdshPlugin(p.SingletonPlugin, DefaultTranslation, tk.DefaultDatasetForm): p.implements(p.IActions) p.implements(p.IConfigurer) p.implements(p.IDatasetForm) p.implements(p.IFacets) p.implements(p.IPackageController, inherit=True) p.implements(p.ITemplateHelpers) p.implements(p.ITranslation) p.implements(p.IValidators) p.implements(p.IResourceController, inherit=True) p.implements(p.IBlueprint) # IBlueprint def get_blueprint(self): log.debug("OdshPlugin::get_blueprint") # Default bp_default = default.blueprint rules = [ ('/info_page', 'info_page', default.info_page), ('/home', 'start', default.start), ('/robots.txt', 'robots_txt', default.robots_txt), ('/not_found', 'not_found', default.not_found), ] for rule in rules: bp_default.add_url_rule(*rule) # DCAT # if tk.asbool(tk.config.get('ckanext.dcat.enable_rdf_endpoints', True)): # helpers_odsh.odsh_remove_route(map, 'dcat_catalog') # bp_default.add_url_rule('/catalog.<any("xml", "rdf", "n3", "ttl", "jsonld"):_format>', view_func=dcat_view.read_catalog, defaults={'_format': 'xml'}, methods=['GET']) # Package # bp_package = package.blueprint # rules = [ # ('/dataset/<id>/resource/<resource_id>', 'redirect_dataset_resource', package.redirect_dataset_resource), ] # for rule in rules: # bp_package.add_url_rule(*rule) # User bp_user = user.blueprint bp_user.add_url_rule(u'/user', endpoint='user_index', view_func=user.index, strict_slashes=False) # bp_user.add_url_rule(u'/user/register', view_func=user.register) # Harvest bp_harvest = harvest.blueprint bp_harvest.add_url_rule(u'/harvest', view_func=harvest.search, strict_slashes=False) bp_harvest.add_url_rule(u'/harvest/new', view_func=harvest.new) bp_harvest.add_url_rule(u'/harvest/<id>', view_func=harvest.read) bp_harvest.add_url_rule(u'/harvest/about/<id>', view_func=harvest.about) # Feed bp_feed = feed.blueprint bp_feed.add_url_rule(u'/feeds/custom.atom', methods=[u'GET'], view_func=feed.custom) return [bp_default, bp_user, bp_harvest, bp_feed] # bp_package # IActions def get_actions(self): return {'package_create': action.odsh_package_create, 'user_update': action.odsh_user_update, 'user_create': action.odsh_user_create, 'resource_create': action.odsh_resource_create, } # IConfigurer def update_config(self, config_): tk.add_template_directory(config_, 'templates') tk.add_public_directory(config_, 'public') tk.add_resource('assets', 'ckanext-odsh') def after_map(self, map): return map def package_types(self): # This plugin doesn't handle any special package types, it just # registers itself as the default (above). return [] def is_fallback(self): # Return True to register this plugin as the default handler for # package types not handled by any other IDatasetForm plugin. return True def create_package_schema(self) -> Schema: log.debug("OdshPlugin::create_package_schema") schema: Schema = super(OdshPlugin, self).create_package_schema() schema = self._update_schema(schema) schema = self._update_create_or_update_package_schema(schema) return schema def update_package_schema(self) -> Schema: log.debug("OdshPlugin::update_package_schema") schema: Schema = super(OdshPlugin, self).update_package_schema() schema = self._update_schema(schema) schema = self._update_create_or_update_package_schema(schema) return schema def show_package_schema(self) -> Schema: log.debug("OdshPlugin::show_package_schema") schema: Schema = super(OdshPlugin, self).show_package_schema() schema = self._update_show_package_schema(schema) return schema def _update_schema(self, schema: Schema) -> Schema: for field in ['title', 'license_id']: schema.update({field: [tk.get_converter('not_empty')]}) for field in ['reference','applicableLegislation','hvdCategory', 'is_replaced_by', 'version_notes']: schema.update({ field: [ tk.get_validator('ignore_missing'), tk.get_converter('convert_to_extras') ] }) schema['resources'].update({ 'url': [tk.get_converter('not_empty')], 'format': [tk.get_converter('odsh_validate_format')], }) schema['extras'].update({ 'key': [ tk.get_converter('known_spatial_uri'), tk.get_converter('validate_licenseAttributionByText'), ] }) schema.update( {'__extras': [tk.get_converter('odsh_validate_extras')]}) return schema def _update_create_or_update_package_schema(self, schema: Schema) -> Schema: schema.update({ 'language': [ tk.get_validator('ignore_missing'), tk.get_converter('convert_to_extras') ], 'thumbnail': [ tk.get_validator('ignore_missing'), tk.get_converter('convert_to_extras') ], 'relatedPackage': [ tk.get_validator('validate_relatedPackage'), tk.get_converter('convert_to_extras') ], 'accessibility': [ tk.get_validator('ignore_missing'), tk.get_converter('convert_to_extras') ], 'is_replaced_by': [ tk.get_validator('ignore_missing'), tk.get_converter('convert_to_extras') ], 'version_notes': [ tk.get_validator('ignore_missing'), tk.get_converter('convert_to_extras') ], }) return schema def _update_show_package_schema(self, schema: Schema) -> Schema: schema.update({ 'language': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'thumbnail': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'relatedPackage': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'accessibility': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'is_replaced_by': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'version_notes': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'reference': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'hvdCategory': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], 'applicableLegislation': [ tk.get_converter('convert_from_extras'), tk.get_validator('ignore_missing') ], }) return schema # IFacets def dataset_facets(self, facets_dict, package_type): return OrderedDict({'organization': _('Organizations'), 'groups': _('Category'), 'res_format': _('File format'), 'license_title': _('License'), # 'tags': _('Tags'), 'openness': _('Open-Data-Eigenschaften') }) def group_facets(self, facets_dict, group_type, package_type): return OrderedDict({'organization': _('Organizations'), 'res_format': _('File format'), 'license_title': _('License'), 'groups': _('Category')}) def organization_facets(self, facets_dict, organization_type, package_type): return OrderedDict({'organization': _('Organizations'), 'res_format': _('File format'), 'license_title': _('License'), 'groups': _('Category')}) # IPackageController def after_dataset_show(self, context, pkg_dict): ''' corrects missing relationships in pkg dict adds the following key-value-pairs to pkg_dict: # key: 'is_new', value: True if the dataset has been created within the last month ''' pkg_dict = helpers_odsh.correct_missing_relationship( pkg_dict, helpers_odsh.get_pkg_relationships_from_model(pkg_dict) ) self._update_is_new_in_pkg_dict(pkg_dict) return pkg_dict def before_dataset_view(self, pkg_dict): ''' adds the following key-value-pairs to pkg_dict: # key: 'is_new', value: True if the dataset has been created within the last month ''' self._update_is_new_in_pkg_dict(pkg_dict) return pkg_dict def after_dataset_create(self, context, resource): if resource.get('package_id'): tools.add_attributes_resources(context, resource) def after_dataset_update(self, context, resource): if resource.get('package_id'): tools.add_attributes_resources(context, resource) @staticmethod def _update_is_new_in_pkg_dict(pkg_dict): is_new = HelperPgkDict(pkg_dict).is_package_new() pkg_dict.update({'is_new': is_new}) def before_dataset_index(self, dict_pkg): # make special date fields solr conform fields = ["issued", "temporal_start", "temporal_end"] for field in fields: field = 'extras_' + field if field in dict_pkg and dict_pkg[field]: d = parse(dict_pkg[field]) dict_pkg[field] = '{0.year:04d}-{0.month:02d}-{0.day:02d}T00:00:00Z'.format( d) self.map_qa_score(dict_pkg) if 'qa' in dict_pkg: dict_pkg.pop('qa') return dict_pkg # Add the custom parameters to Solr's facet queries # use several daterange queries agains temporal_start and temporal_end field # TODO: use field of type date_range in solr index instead def before_dataset_search(self, search_params): return search.before_dataset_search(search_params) # ITemplateHelpers def get_helpers(self): # Template helper function names should begin with the name of the # extension they belong to, to avoid clashing with functions from # other extensions. return { 'odsh_main_groups': helpers_odsh.odsh_main_groups, 'odsh_now': helpers_odsh.odsh_now, 'odsh_group_id_selected': helpers_odsh.odsh_group_id_selected, 'odsh_get_facet_items_dict': helpers_odsh.odsh_get_facet_items_dict, 'odsh_openness_score_dataset_html': helpers_odsh.odsh_openness_score_dataset_html, 'odsh_get_resource_details': helpers_odsh.odsh_get_resource_details, 'odsh_get_resource_views': helpers_odsh.odsh_get_resource_views, 'odsh_get_bounding_box': helpers_odsh.odsh_get_bounding_box, 'odsh_get_spatial_text': helpers_odsh.odsh_get_spatial_text, 'odsh_render_datetime': helpers_odsh.odsh_render_datetime, 'odsh_resource_formats': helpers_odsh.odsh_resource_formats, 'odsh_encodeurl': helpers_odsh.odsh_encodeurl, 'odsh_extract_error': helpers_odsh.odsh_extract_error, 'odsh_extract_error_new': helpers_odsh.odsh_extract_error_new, 'odsh_extract_value_from_extras': helpers_odsh.odsh_extract_value_from_extras, 'extract_email': helpers_odsh.extract_email, 'odsh_create_checksum': helpers_odsh.odsh_create_checksum, 'presorted_license_options': helpers_odsh.presorted_license_options, 'odsh_has_more_facets': helpers_odsh.odsh_has_more_facets, 'odsh_public_url': helpers_odsh.odsh_public_url, 'odsh_spatial_extends_available': helpers_odsh.spatial_extends_available, 'odsh_public_resource_url': helpers_odsh.odsh_public_resource_url, 'odsh_show_testbanner': helpers_odsh.odsh_show_testbanner, 'get_daterange_prettified': helper_pkg_dict.get_daterange_prettified, 'get_language_of_package': helpers_odsh.get_language_of_package, 'get_language_icon': helpers_odsh.get_language_icon, 'short_name_for_category': helpers_odsh.short_name_for_category, 'get_spatial_for_selection': helpers_odsh.get_spatial_for_selection, 'get_language_for_selection': helpers_odsh.get_language_for_selection, 'get_resource_size': helpers_odsh.get_resource_size, 'get_address_org':helpers_odsh.get_address_org, 'get_body_mail':helpers_odsh.get_body_mail, 'odsh_load_applicable_legislations': helpers_odsh.odsh_load_applicable_legislations, 'odsh_load_hvd_categories': helpers_odsh.odsh_load_hvd_categories, 'odsh_load_mdk_sample_dataset': helpers_odsh.odsh_load_mdk_sample_dataset, 'odsh_load_raw_mdk_sample_dataset': helpers_odsh.odsh_load_raw_mdk_sample_dataset, 'format_resource_format': helpers_odsh.format_resource_format, 'odsh_matomo_enabled': helpers_odsh.odsh_show_testbanner, 'odsh_matomo_base_uri': helpers_odsh.odsh_matomo_base_uri, 'odsh_matomo_site_id': helpers_odsh.odsh_matomo_site_id, } # IValidators def get_validators(self): return validation.get_validators() scores = [['0OL'], ['0OL', '1RE'], ['0OL', '1RE', '2OF'], [ '0OL', '1RE', '2OF', '3URI'], ['0OL', '1RE', '2OF', '3URI', '4LD']] def map_qa_score(self, dict_pkg): if 'validated_data_dict' in dict_pkg and 'openness_score' in dict_pkg['validated_data_dict']: d = json.loads(dict_pkg['validated_data_dict']) score = -1 for r in d['resources']: if ('qa' in r) and (type(r['qa']) is dict) and ('openness_score' in r['qa']): s = int(r['qa']['openness_score']) if s > score: score = s if score > 0: dict_pkg['openness'] = OdshPlugin.scores[score-1]