Skip to content
Snippets Groups Projects
Select Git revision
  • 941558501c24852f72e620aa539a9cb6bf484a22
  • main default protected
  • OZG-7970-AlfaCodeFlow
  • OZG-7985-Statistik-Datenfreigabe
  • OZG-8073-date-component
  • OZG-7856_schadcode_scanner
  • OZG-8305-Create-webpack-sbom
  • tooltip-improvements
  • ods-remove-class-inputs
  • release-info
  • release-administration
  • release
  • OZG-7714-UpgradeKeycloakDependencyTo25
  • OZG-8086-Admin-Datenanfrage-erstellen
  • OZG-8086-Datenanfrage-Umbenennung
  • mongodb-7-0-16-e2e
  • OZG-6220-Bescheid-speichern-ohne-Postfach
  • OZG-7922-KeycloakOperatorExceptions
  • OZG-8142-poc-cards
  • OZG-8086-E2E
  • OZG-8086-E2E2
  • 1.11.0-info
  • 1.11.0-administration
  • 2.26.0-alfa
  • 1.10.0-info
  • 1.10.0-administration
  • 2.25.0-alfa
  • 1.9.0-info
  • 1.9.0-administration
  • 2.24.0-alfa
  • 1.8.0-info
  • 1.8.0-administration
  • 2.23.0-alfa
  • 1.7.0-info
  • 1.7.0-administration
  • 2.22.0-alfa
  • 1.6.0-info
  • 1.6.0-administration
  • 2.21.0-alfa
  • 1.5.0-info
  • 1.5.0-administration
41 results

admin-settings.module.spec.ts

Blame
  • helpers.py 5.34 KiB
    import logging
    import traceback
    import ast
    import ckan.plugins.toolkit as toolkit
    import ckan.logic as logic
    import ckan.model as model
    import json
    from ckan.common import c
    import datetime
    from dateutil import parser
    from ckan.common import config
    import urllib
    import hashlib
    import re
    
    get_action = logic.get_action
    log = logging.getLogger(__name__)
    
    
    def odsh_openness_score_dataset_html(dataset):
        score = 0
        #dataset = json.loads(dataset)
        resources = dataset.get('resources')
        if resources is None:
            return 0
        for resource in resources:
            r_qa = resource.get('qa')
            if r_qa:
                try:
                    qa = None
                    # r_qa might be a string of a dictionary when 'dataset' is send from solr
                    if isinstance(r_qa, basestring):
                        qa = ast.literal_eval(r_qa)
                    else:
                        qa = r_qa
                    resource_score = qa.get('openness_score')
                    if resource_score > score:
                        score = resource_score
                except AttributeError, e:
                    log.error('Error while calculating openness score %s: %s\nException: %s',
                              e.__class__.__name__,  unicode(e), traceback.format_exc())
        return score
    
    
    def odsh_get_resource_details(resource_id):
        resource_details = toolkit.get_action('resource_show')(
            data_dict={'id': resource_id})
        return resource_details
    
    
    def odsh_get_resource_views(pkg_dict, resource):
    
        context = {'model': model, 'session': model.Session,
                   'user': c.user, 'for_view': True,
                   'auth_user_obj': c.userobj}
        return get_action('resource_view_list')(
            context, {'id': resource['id']})
    
    
    def odsh_get_bounding_box(pkg_dict):
        try:
            extras = pkg_dict.get('extras')
            spatial = None
            for f in extras:
                if 'key' in f and f['key'] == 'spatial':
                    spatial = f['value']
                    break
    
            if spatial is not None:
                d = json.loads(spatial)
                if 'coordinates' in d:
                    coords = d['coordinates']
                    return compute_bounding_box(coords)
        except Exception, e:
            log.error('Error while bounding box %s: %s\nException: %s',
                      e.__class__.__name__,  unicode(e), traceback.format_exc())
        return None
    
    
    def compute_bounding_box(coords):
        if len(coords) == 0:
            return None
    
        coords = [c for sublist in coords for c in sublist]
        if type(coords[0][0]) == list:
            # multipolygon
            coords = [c for sublist in coords for c in sublist]
    
        minx = min(coords, key=lambda t: t[0])[0]
        maxx = max(coords, key=lambda t: t[0])[0]
        miny = min(coords, key=lambda t: t[1])[1]
        maxy = max(coords, key=lambda t: t[1])[1]
    
        return [maxx, minx, maxy, miny]
    
    
    def odsh_get_spatial_text(pkg_dict):
        extras = pkg_dict.get('extras')
        spatial = None
        if extras is None:
            return None
        for f in extras:
            if 'key' in f and f['key'] == 'spatial_text':
                spatial = f['value']
                return spatial
        return None
    
    def extend_search_convert_local_to_utc_timestamp(str_timestamp):
        if not str_timestamp:
            return None 
    
        if not re.match(r'\d\d\d\d-\d\d-\d\d', str_timestamp):
            raise 'wrong format'
        
        dt = parser.parse(str_timestamp, dayfirst=False).isoformat()
    
        return dt+"Z"
    
    def odsh_render_datetime(datetime_, fromIso=True):
        date_format='{0.day:02d}.{0.month:02d}.{0.year:4d}'
        if not datetime_:
            return ''
        if not re.match(r'\d\d\d\d-\d\d-\d\d', datetime_):
            return ''
        try:
            if fromIso:
                DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
            else:
                DATETIME_FORMAT = '%Y-%m-%d'
    
            dt = parser.parse(datetime_, dayfirst=False)
            return date_format.format(dt)
    
        except:
            return ''
    
    def odsh_upload_known_formats():
        value = config.get('ckanext.odsh.upload_formats', [])
        value = toolkit.aslist(value)
        return value
    
    
    def odsh_encodeurl(url):
        return urllib.quote(url, safe='')
    
    
    def odsh_create_checksum(in_string):
        hashstring = hashlib.md5(in_string.encode('utf-8')).hexdigest()
        return int(hashstring, base=16)
    
    
    def odsh_extract_error(key, errors):
        if not errors or not ('extras' in errors):
            return None
        ext = errors['extras']
        for item in ext:
            if 'key' in item:
                for error in item['key']:
                    if error.startswith(key):
                        return error.replace(key+':', '')
    
    
    def odsh_extract_value_from_extras(extras, key):
        if not extras:
            return None
        for item in extras:
            if 'key' in item and item['key'].lower() == key.lower():
                if 'value' in item:
                    return item['value']
                return None
    
    
    def presorted_license_options(existing_license_id=None):
        '''Returns [(l.title, l.id), ...] for the licenses configured to be
        offered. Always includes the existing_license_id, if supplied.
        '''
        register = model.Package.get_license_register()
        licenses = register.values()
        license_ids = [license.id for license in licenses]
        if existing_license_id and existing_license_id not in license_ids:
            license_ids.insert(0, existing_license_id)
        return [('','')]+[
            (license_id,
             register[license_id].title if license_id in register else license_id)
            for license_id in license_ids]