Skip to content
Snippets Groups Projects
helpers.py 5.67 KiB
from string import lower
from operator import itemgetter

import ckan.lib.helpers as helpers
import ckan.model as model
import ckan.plugins.toolkit as toolkit


def get_collection(dataset_dict):
    collection_id = get_collection_id(dataset_dict)
    if collection_id:
        return get_collection_info(collection_id, dataset_dict)
    
    return None


def get_collection_info(collection_id, dataset_dict=None):
    collection_dict = get_package_dict(collection_id)
    dataset_names = get_dataset_names(collection_dict)
    datasets_in_collection = get_datasets_from_solr(dataset_names)
    collection_info = gather_collection_info(collection_dict, datasets_in_collection, dataset_dict)
    return collection_info


def get_collection_id(dataset_dict):
    relationships_dataset = dataset_dict.get('relationships_as_subject')
    if relationships_dataset and len(relationships_dataset):        
        return relationships_dataset[0]['__extras']['object_package_id']
    relationships_dataset = dataset_dict.get('relationships')
    if relationships_dataset and len(relationships_dataset):
        return relationships_dataset[0].get('object')
    return None


def get_package_dict(name):
    package = model.Package.get(name)
    if package:
        return package.as_dict()
    else:
        return None


def get_dataset_names(collection_dict):
    collection_dict = get_package_dict(collection_dict.get('id')) # needed to get full package_dict
    if collection_dict:
       relationships_collection = collection_dict.get('relationships')
       names_collection_members = [relationship.get('object') for relationship in relationships_collection]
       return names_collection_members
    else:
       return []


def get_datasets_from_solr(dataset_names):
    context = None

    if not dataset_names:
        return []

    name_expression = ' OR '.join(dataset_names)
    fq = 'name:({})'.format(name_expression)
    
    sort = 'extras_issued asc'
    
    # maximum possible number of results is 1000, 
    # see https://docs.ckan.org/en/ckan-2.7.3/api/index.html#ckan.logic.action.get.package_search
    query_result = toolkit.get_action('package_search')(context, {
        'fq': fq,
        'sort': sort,
        'rows': 1000, 
    })

    results = query_result.get('results')
    datasets_found = results if results else []

    return datasets_found


def gather_collection_info(collection_dict, datasets_in_collection, dataset_dict=None):
    url_collection = url_from_id(collection_dict.get('name'))

    if not datasets_in_collection:
        return {
            'title': collection_dict.get('title'),
            'url': url_collection,
            'members': []
        }

    name_first_dataset = datasets_in_collection[0].get('name')
    url_first_dataset = url_from_id(name_first_dataset)
    
    name_last_dataset = datasets_in_collection[-1].get('name')
    url_last_dataset = url_from_id(name_last_dataset)

    name_collection = collection_dict.get('name')
    persistent_link_last_member = url_last_member(name_collection)


    if dataset_dict:
        name_current_dataset = dataset_dict.get('name')
        dataset_names = [d.get('name') for d in datasets_in_collection]
        
        def get_predecessor():
            try:
                id_current = dataset_names.index(name_current_dataset)
            except ValueError:
                return None
            if id_current > 0:
                return dataset_names[id_current - 1]
            return None
        
        def get_successor():
            try:
                id_current = dataset_names.index(name_current_dataset)
            except ValueError:
                return None
            if id_current < len(dataset_names) - 1:
                return dataset_names[id_current + 1]
            return None
        
        name_predecessor = get_predecessor()
        url_predecessor = url_from_id(name_predecessor) if name_predecessor else None
        
        name_successor = get_successor()
        url_successor = url_from_id(name_successor) if name_successor else None
    else:
        url_predecessor = url_successor = None
    
    return {
        'title': collection_dict.get('title'),
        'url': url_collection,
        'members': datasets_in_collection,
        'first_member': {
            'name': name_first_dataset,
            'url': url_first_dataset,
        },
        'last_member': {
            'name': name_last_dataset,
            'url': url_last_dataset,
        },
        'predecessor': {
            'url': url_predecessor,
        },
        'successor': {
            'url': url_successor,
        },
        'persistent_link_last_member': persistent_link_last_member,
    }

def url_from_id(package_id):
    return helpers.url_for(controller='package', action='read', id=package_id)

def url_last_member(name_collection):
    return helpers.url_for(
        controller='ckanext.odsh.collection.controller:LatestDatasetController', 
        action='latest',
        id=name_collection
    )


def get_latest_dataset(collection_name):
    collection_info = get_collection_info(collection_name)
    latest_name = collection_info['last_member']['name']
    return latest_name


def get_latest_resources_for_format(collection_name, resource_format):
    collection_info = get_collection_info(collection_name)
    members = collection_info.get('members')
    if not members:
        return None
    latest_dataset = members[-1]
    resources = latest_dataset.get('resources')
    if not resources:
        return None
    resources_with_asked_type = [r for r in resources if r.get('format').upper() == resource_format.upper()]
    resources_sorted = sorted(resources_with_asked_type, key=itemgetter('id','created'), reverse=True)
    return resources_sorted[-1]