Newer
Older
import ckan.lib.base as base
from ckan.controllers.home import HomeController
from ckan.controllers.user import UserController
from ckan.controllers.organization import OrganizationController
from ckanext.harvest.controllers.view import ViewController as HarvestController
from ckan.controllers.feed import FeedController, ITEMS_LIMIT, _package_search, _create_atom_id
import ckan.lib.helpers as h
import ckan.authz as authz
from ckan.common import c
from ckan.common import c, request, config
import ckan.plugins.toolkit as toolkit
from ckanext.dcat.controllers import DCATController
class OdshRouteController(HomeController):
def info_page(self):
h.redirect_to('http://www.schleswig-holstein.de/odpinfo')
h.redirect_to('http://www.schleswig-holstein.de/odpstart')
class OdshUserController(UserController):
def index(self):
if not authz.is_sysadmin(c.user):
abort(404)
def me(self, locale=None):
if not c.user:
h.redirect_to(locale=locale, controller='user', action='login',
id=None)
user_ref = c.userobj.get_reference_preferred_for_uri()
h.redirect_to(locale=locale, controller='package', action='search')
def dashboard(self, id=None, offset=0):
if not authz.is_sysadmin(c.user):
abort(404)
return super(OdshUserController, self).dashboard(id, offset)
def dashboard_datasets(self):
if not authz.is_sysadmin(c.user):
abort(404)
return super(OdshUserController, self).dashboard_datasets(id)
if not c.user:
h.redirect_to(controller='user', action='login')
def follow(self, id):
if not authz.is_sysadmin(c.user):
abort(404)
def unfollow(self, id):
if not authz.is_sysadmin(c.user):
abort(404)
def activity(self, id, offset=0):
if not authz.is_sysadmin(c.user):
abort(404)
return super(OdshUserController, self).activity(id, offset)
def register(self, data=None, errors=None, error_summary=None):
if not authz.is_sysadmin(c.user):
abort(404)
return super(OdshUserController, self).register(data, errors, error_summary)
def edit_view(self, id, resource_id, view_id=None):
if not authz.is_sysadmin(c.user):
abort(403)
return super(OdshPackageController, self).edit_view(id, resource_id, view_id)
class OdshGroupController(OrganizationController):
action = super(OdshGroupController, self)._action(name)
def custom_org_list(context, data_dict):
query = data_dict['q']
result = action(context, data_dict)
for q in query.split(' '):
data_dict['q'] = q
result += action(context, data_dict)
return result
if name is 'group_list':
return custom_org_list
else:
return super(OdshGroupController, self)._action(name)
def action(self, logic_function, ver=None):
try:
function = logic.get_action(logic_function)
side_effect_free = getattr(function, 'side_effect_free', False)
request_data = self._get_request_data(
try_url_params=side_effect_free)
if isinstance(request_data, dict):
id = request_data.get('id', '')
if 'q' in request_data:
id = request_data['q']
if 'query' in request_data:
id = request_data['query']
class OdshDCATController(DCATController):
def read_catalog(self, _format):
matomo.create_matomo_request()
class OdshFeedController(FeedController):
def custom(self):
extra_fields = ['ext_startdate', 'ext_enddate',
'ext_bbox', 'ext_prev_extent']
q = request.params.get('q', u'')
fq = ''
search_params = {}
extras = {}
for (param, value) in request.params.items():
if param not in ['q', 'page', 'sort'] + extra_fields \
and len(value) and not param.startswith('_'):
search_params[param] = value
fq += ' %s:"%s"' % (param, value)
if param in extra_fields:
extras[param] = value
search_params['extras'] = extras
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
page = h.get_page_number(request.params)
limit = ITEMS_LIMIT
data_dict = {
'q': q,
'fq': fq,
'start': (page - 1) * limit,
'rows': limit,
'sort': request.params.get('sort', None),
'extras': extras
}
item_count, results = _package_search(data_dict)
navigation_urls = self._navigation_urls(request.params,
item_count=item_count,
limit=data_dict['rows'],
controller='feed',
action='custom')
feed_url = self._feed_url(request.params,
controller='feed',
action='custom')
atom_url = h._url_with_params('/feeds/custom.atom',
search_params.items())
alternate_url = self._alternate_url(request.params)
site_title = config.get('ckan.site_title', 'CKAN')
return self.output_feed(results,
feed_title=u'%s - Custom query' % site_title,
feed_description=u'Recently created or updated'
' datasets on %s. Custom query: \'%s\'' %
(site_title, q),
feed_link=alternate_url,
feed_guid=_create_atom_id(atom_url),
feed_url=feed_url,
def only_admin(func, *args, **kwargs):
if not authz.is_sysadmin(c.user):
abort(404)
return func(*args, **kwargs)
class MetaClass(type):
def __new__(meta, classname, bases, classDict):
newClassDict = {}
wdec = decorator.decorator(only_admin)
for attributeName, attribute in bases[0].__dict__.items():
if isinstance(attribute, FunctionType) and not attributeName.startswith('_'):
attribute = wdec(attribute)
newClassDict[attributeName] = attribute
return type.__new__(meta, classname, bases, newClassDict)
__metaclass__ = MetaClass # wrap all the methods