-
Thorge Petersen authoredThorge Petersen authored
helpers.py 5.67 KiB
from string import lower
from operator import itemgetter
import ckan.lib.helpers as helpers
import ckan.model as model
import ckan.plugins.toolkit as toolkit
def get_collection(dataset_dict):
collection_id = get_collection_id(dataset_dict)
if collection_id:
return get_collection_info(collection_id, dataset_dict)
return None
def get_collection_info(collection_id, dataset_dict=None):
collection_dict = get_package_dict(collection_id)
dataset_names = get_dataset_names(collection_dict)
datasets_in_collection = get_datasets_from_solr(dataset_names)
collection_info = gather_collection_info(collection_dict, datasets_in_collection, dataset_dict)
return collection_info
def get_collection_id(dataset_dict):
relationships_dataset = dataset_dict.get('relationships_as_subject')
if relationships_dataset and len(relationships_dataset):
return relationships_dataset[0]['__extras']['object_package_id']
relationships_dataset = dataset_dict.get('relationships')
if relationships_dataset and len(relationships_dataset):
return relationships_dataset[0].get('object')
return None
def get_package_dict(name):
package = model.Package.get(name)
if package:
return package.as_dict()
else:
return None
def get_dataset_names(collection_dict):
collection_dict = get_package_dict(collection_dict.get('id')) # needed to get full package_dict
if collection_dict:
relationships_collection = collection_dict.get('relationships')
names_collection_members = [relationship.get('object') for relationship in relationships_collection]
return names_collection_members
else:
return []
def get_datasets_from_solr(dataset_names):
context = None
if not dataset_names:
return []
name_expression = ' OR '.join(dataset_names)
fq = 'name:({})'.format(name_expression)
sort = 'extras_issued asc'
# maximum possible number of results is 1000,
# see https://docs.ckan.org/en/ckan-2.7.3/api/index.html#ckan.logic.action.get.package_search
query_result = toolkit.get_action('package_search')(context, {
'fq': fq,
'sort': sort,
'rows': 1000,
})
results = query_result.get('results')
datasets_found = results if results else []
return datasets_found
def gather_collection_info(collection_dict, datasets_in_collection, dataset_dict=None):
url_collection = url_from_id(collection_dict.get('name'))
if not datasets_in_collection:
return {
'title': collection_dict.get('title'),
'url': url_collection,
'members': []
}
name_first_dataset = datasets_in_collection[0].get('name')
url_first_dataset = url_from_id(name_first_dataset)
name_last_dataset = datasets_in_collection[-1].get('name')
url_last_dataset = url_from_id(name_last_dataset)
name_collection = collection_dict.get('name')
persistent_link_last_member = url_last_member(name_collection)
if dataset_dict:
name_current_dataset = dataset_dict.get('name')
dataset_names = [d.get('name') for d in datasets_in_collection]
def get_predecessor():
try:
id_current = dataset_names.index(name_current_dataset)
except ValueError:
return None
if id_current > 0:
return dataset_names[id_current - 1]
return None
def get_successor():
try:
id_current = dataset_names.index(name_current_dataset)
except ValueError:
return None
if id_current < len(dataset_names) - 1:
return dataset_names[id_current + 1]
return None
name_predecessor = get_predecessor()
url_predecessor = url_from_id(name_predecessor) if name_predecessor else None
name_successor = get_successor()
url_successor = url_from_id(name_successor) if name_successor else None
else:
url_predecessor = url_successor = None
return {
'title': collection_dict.get('title'),
'url': url_collection,
'members': datasets_in_collection,
'first_member': {
'name': name_first_dataset,
'url': url_first_dataset,
},
'last_member': {
'name': name_last_dataset,
'url': url_last_dataset,
},
'predecessor': {
'url': url_predecessor,
},
'successor': {
'url': url_successor,
},
'persistent_link_last_member': persistent_link_last_member,
}
def url_from_id(package_id):
return helpers.url_for(controller='package', action='read', id=package_id)
def url_last_member(name_collection):
return helpers.url_for(
controller='ckanext.odsh.collection.controller:LatestDatasetController',
action='latest',
id=name_collection
)
def get_latest_dataset(collection_name):
collection_info = get_collection_info(collection_name)
latest_name = collection_info['last_member']['name']
return latest_name
def get_latest_resources_for_format(collection_name, resource_format):
collection_info = get_collection_info(collection_name)
members = collection_info.get('members')
if not members:
return None
latest_dataset = members[-1]
resources = latest_dataset.get('resources')
if not resources:
return None
resources_with_asked_type = [r for r in resources if r.get('format').upper() == resource_format.upper()]
resources_sorted = sorted(resources_with_asked_type, key=itemgetter('id','created'), reverse=True)
return resources_sorted[-1]