diff --git a/ckanext/odsh/logic/auth.py b/ckanext/odsh/logic/auth.py index 460ed331cbb1130306df5e5bcf05995bd45e1da0..d3b3b2a9b0c83a82330cba904950d90a80a2e849 100644 --- a/ckanext/odsh/logic/auth.py +++ b/ckanext/odsh/logic/auth.py @@ -1,23 +1,45 @@ -import ckan.logic.auth +import ckan.logic.auth.get as get +import ckan.logic.auth.update as update +import ckan.logic.auth.delete as delete +import ckan.logic.auth.create as create import ckan.plugins as p def _is_sysadmin(context): return context["auth_user_obj"].sysadmin +def allow_sysadmin_only(original_auth_function): + def _decorator(func): + def wrapped_auth_function(context, data_dict=None): + if not _is_sysadmin(context): + return {"success": False} + return original_auth_function(context, data_dict=data_dict) + + return wrapped_auth_function + + return _decorator + +@allow_sysadmin_only(get.user_list) def user_list(context, data_dict): - if not _is_sysadmin(context): - return {"success": False} - return ckan.logic.auth.get.user_list(context, data_dict) + pass +@allow_sysadmin_only(update.user_update) def user_update(context, data_dict): - if not _is_sysadmin(context): - return {"success": False} - return ckan.logic.auth.update.user_update(context, data_dict) + pass + +@allow_sysadmin_only(create.user_create) +def user_create(context, data_dict): + pass + +@allow_sysadmin_only(create.user_invite) +def user_invite(context, data_dict): + pass def get_auth_functions(): return { "user_list": user_list, "user_update": user_update, + "user_create": user_create, + "user_invite": user_invite, } \ No newline at end of file diff --git a/ckanext/odsh/tests_tpsh/test_auth.py b/ckanext/odsh/tests_tpsh/test_auth.py index f719da4f57564d61627777c6900b2aebc254067c..0c2e0e895520d5c05ab54cd62425f67dd64dcc78 100644 --- a/ckanext/odsh/tests_tpsh/test_auth.py +++ b/ckanext/odsh/tests_tpsh/test_auth.py @@ -23,13 +23,19 @@ class TestAuthorization: assert response.status_code == 403 assert "Zugriff nicht erlaubt" in response - def test_user_list_not_accessible_by_regular_user(self): + def test_user_actions_not_accessible_by_regular_user(self): + def assert_not_authorized(action, context, data_dict): + with pytest.raises(NotAuthorized): + logic.check_access(action, context, data_dict=data_dict) + user = factories.User() username = user["name"] - with pytest.raises(NotAuthorized): - logic.check_access("user_list", {"user": username}, {}) - with pytest.raises(NotAuthorized): - logic.check_access("user_update", {"user": username}, {"id": username}) + + assert_not_authorized("user_list", {"user": username}, {}) + assert_not_authorized("user_update", {"user": username}, {"id": username}) + assert_not_authorized("user_delete", {"user": username}, {"id": username}) + assert_not_authorized("user_create", {"user": username}, {"name": "foo"}) + assert_not_authorized("user_invite", {"user": username}, {}) def test_user_list_accessible_for_sysadmin(self): adminuser = factories.Sysadmin() @@ -38,6 +44,9 @@ class TestAuthorization: username = user["name"] logic.check_access("user_list", {"user": adminusername}, {}) logic.check_access("user_update", {"user": adminusername}, {"id": username}) + logic.check_access("user_delete", {"user": adminusername}, {"id": username}) + logic.check_access("user_create", {"user": adminusername}, {"name": "foo"}) + logic.check_access("user_invite", {"user": adminusername}, {})