Skip to content
Snippets Groups Projects
helpers.py 5.64 KiB
Newer Older
  • Learn to ignore specific revisions
  • from string import lower
    
    
    Benjamin Becker's avatar
    Benjamin Becker committed
    import ckan.lib.helpers as helpers
    
    import ckan.model as model
    
    import ckan.plugins.toolkit as toolkit
    
    from profilehooks import timecall, profile
    
    def get_collection(dataset_dict):
        collection_id = get_collection_id(dataset_dict)
        if collection_id:
    
            return get_collection_info(collection_id, dataset_dict)
    
        
    
    def get_collection_info(collection_id, dataset_dict=None):
        collection_dict = get_package_dict(collection_id)
        dataset_names = get_dataset_names(collection_dict)
        datasets_in_collection = get_datasets_from_solr(dataset_names)
        collection_info = gather_collection_info(collection_dict, datasets_in_collection, dataset_dict)
        return collection_info
    
    
    
    @timecall(immediate=True)
    def get_collection_id(dataset_dict):
        relationships_dataset = dataset_dict.get('relationships_as_subject')
        if len(relationships_dataset):        
            return relationships_dataset[0]['__extras']['object_package_id']
        return None
    
    
    @timecall(immediate=True)
    def get_relationships(name):
        pass
    
    
    
    def get_package_dict(name):
        return model.Package.get(name).as_dict()
    
    
    
    @timecall(immediate=True)
    def get_dataset_names(collection_dict):
        relationships_collection = collection_dict.get('relationships')
        names_collection_members = [relationship.get('object') for relationship in relationships_collection]
        return names_collection_members
    
    
    def get_datasets_from_solr(dataset_names):
        '''
        queries SOLR with a query of the following form:
        package_search?fq=name:(name1+OR+name2)&sort=extras_issued+asc
        '''
        context = None # see https://docs.ckan.org/en/ckan-2.7.3/extensions/plugins-toolkit.html#ckan.plugins.toolkit.ckan.plugins.toolkit.get_action
    
        name_expression = ' OR '.join(dataset_names)
        fq = 'name:({})'.format(name_expression)
        
        sort = 'name asc,extras_issued asc'
        
        query_result = toolkit.get_action('package_search')(context, {
            'fq': fq,
            'sort': sort,
            'rows': 1000, # maximum possible number of results, see https://docs.ckan.org/en/ckan-2.7.3/api/index.html#ckan.logic.action.get.package_search
        })
    
        results = query_result.get('results')
        datasets_found = results if results else []
    
        return datasets_found
    
    
    
    def gather_collection_info(collection_dict, datasets_in_collection, dataset_dict=None):
    
        url_from_id = lambda id: helpers.url_for(controller='package', action='read', id=id)
    
    
        name_first_dataset = datasets_in_collection[0].get('name')
        url_first_dataset = url_from_id(name_first_dataset)
        
        name_last_dataset = datasets_in_collection[-1].get('name')
        url_last_dataset = url_from_id(name_last_dataset)
    
        name_collection = collection_dict.get('name')
        persistent_link_last_member = helpers.url_for(
            controller='ckanext.odsh.collection.controller:LatestDatasetController', 
            action='latest',
            id=name_collection
        )
    
    
        if dataset_dict:
            name_current_dataset = dataset_dict.get('name')
            dataset_names = [d.get('name') for d in datasets_in_collection]
            
            def get_predecessor():
                id_current = dataset_names.index(name_current_dataset)
                if id_current and id_current > 0:
                    return dataset_names[id_current - 1]
                return None
            
            def get_successor():
                id_current = dataset_names.index(name_current_dataset)
                if id_current and id_current < len(dataset_names) - 1:
                    return dataset_names[id_current + 1]
                return None
            
            name_predecessor = get_predecessor()
            url_predecessor = url_from_id(name_predecessor) if name_predecessor else None
            
            name_successor = get_successor()
            url_successor = url_from_id(name_successor) if name_successor else None
        else:
            url_predecessor = url_successor = None
    
        
        return {
            'title': collection_dict.get('title'),
            'members': datasets_in_collection,
            'first_member': {
                'url': url_first_dataset,
            },
            'last_member': {
                'url': url_last_dataset,
            },
            'predecessor': {
                'url': url_predecessor,
            },
            'successor': {
                'url': url_successor,
            },
            'persistent_link_last_member': persistent_link_last_member,
        }
    
    
    
    
    
    
    
    #for mapping latest resources and latest dataset
    
    
    @timecall(immediate=True)
    
    def get_latest_dataset(collection_name):
    
        collection_list_relationships = get_dataset_names(collection_name)
    
        latest_issued = latest_name = None
        for item in collection_list_relationships:
            item_pkt_dict =  get_package_dict(item)
    
    Benjamin Becker's avatar
    Benjamin Becker committed
            if helpers.check_access('package_show', item_pkt_dict):
    
                item_issued = item_pkt_dict.get('extras').get('issued')
                if latest_issued < item_issued or (latest_issued == item_issued and latest_name < item):
                    latest_name=item
                    latest_issued=item_issued
        return latest_name
    
    
    
    @timecall(immediate=True)
    
    def get_latest_resources_for_type(collection_name, type):
        latest_dataset_name = get_latest_dataset(collection_name)
        latest_dataset = get_package_dict(latest_dataset_name)
        resource_list = latest_dataset.get('resources')
        latest_resource = latest_created = latest_id = None
        for resource in resource_list:
            resource_format = resource.get('format')
            resource_created = resource.get('created')
            resource_id = resource.get('id')
            if is_latest_resources(resource_format, type, resource_created, latest_created, resource_id, latest_id):
                latest_id=resource_id
                latest_created=resource_created
                latest_resource=resource
        return latest_resource