Created
August 24, 2011 11:08
-
-
Save mkmik/1167801 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ... | |
def setup_environ(): | |
from grokcore.component.testing import grok | |
grok('opennode.oms.endpoint.ssh.cmd.grokkers') # XXX: Not sure why this needs to be explicit--an ordering issue? | |
grok('opennode.oms.security.grokkers') # XXX: Not sure why this needs to be explicit--an ordering issue? | |
grok('grokcore.security.meta') # invoke the PermissionGrokker which will register groksecurity permissions. | |
grok('opennode.oms') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- /tmp/ch_zope.py 2011-08-24 13:18:43.580550455 +0200 | |
+++ /tmp/ch_our.py 2011-08-24 13:20:48.900550408 +0200 | |
@@ -21,6 +21,8 @@ | |
assert isinstance(set_permissions, dict) | |
self.set_permissions = set_permissions | |
+ self.interaction = None | |
+ | |
def permission_id(self, name): | |
'See INameBasedChecker' | |
return self.get_permissions.get(name) | |
@@ -44,7 +46,7 @@ | |
if permission is not None: | |
if permission is CheckerPublic: | |
return # Public | |
- if thread_local.interaction.checkPermission(permission, object): | |
+ if self.interaction.checkPermission(permission, object): # use local interaction | |
return # allowed | |
else: | |
__traceback_supplement__ = (TracebackSupplement, object) | |
@@ -56,10 +58,10 @@ | |
def check(self, object, name): | |
'See IChecker' | |
permission = self.get_permissions.get(name) | |
if permission is not None: | |
if permission is CheckerPublic: | |
return # Public | |
- if thread_local.interaction.checkPermission(permission, object): | |
+ if self.interaction.checkPermission(permission, object): # use local interaction | |
return | |
else: | |
__traceback_supplement__ = (TracebackSupplement, object) | |
@@ -77,7 +80,7 @@ | |
return value | |
checker = getattr(value, '__Security_checker__', None) | |
if checker is None: | |
- checker = selectChecker(value) | |
+ checker = _select_checker(value, self.interaction) # pass interaction | |
if checker is None: | |
return value | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ... | |
class Compute(Model): | |
implements(ICompute, IAttributeAnnotatable) | |
permissions(dict(architecture='read')) | |
# ... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import martian | |
__all__ = ['permissions'] | |
class permissions(martian.Directive): | |
"""Use this directive in a class in order to set it's attribute's permissions.""" | |
scope = martian.CLASS | |
store = martian.ONCE | |
default = None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import grokcore.security | |
import martian | |
import zope.schema | |
from grokcore.security import require | |
from zope.interface import implementedBy | |
from zope.security.checker import defineChecker | |
from opennode.oms.model.model.base import Model, IModel | |
from opennode.oms.model.model.compute import Compute, ICompute | |
from opennode.oms.security.checker import Checker | |
from opennode.oms.security.directives import permissions | |
class SecurityGrokker(martian.ClassGrokker): | |
martian.component(object) | |
martian.directive(permissions, name='permissions') | |
def execute(self, factory, config, permissions, **kw): | |
if not permissions: | |
return False | |
# mandatory, otherwise zope's default Checker impl will be used | |
# which doesn't play well in async frameworks like twisted. | |
defineChecker(factory, Checker({},{})) | |
for name, permission in permissions.items(): | |
config.action( | |
discriminator=('protectName', factory, name), | |
callable=grokcore.security.util.protect_getattr, | |
args=(factory, name, permission), | |
) | |
return True |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from zope.interface import implements | |
from zope.security._proxy import _Proxy as Proxy | |
from zope.security.checker import _available_by_default, getCheckerForInstancesOf, CheckerPublic, TracebackSupplement | |
from zope.security.interfaces import INameBasedChecker, Unauthorized, ForbiddenAttribute | |
def _select_checker(value, interaction): | |
checker = getCheckerForInstancesOf(type(value)) | |
# handle checkers for "primitive" types like str | |
if type(checker) is object: | |
return checker | |
# create a custom instance | |
bound_checker = type(checker).__new__(type(checker)) | |
bound_checker.__dict__ = checker.__dict__.copy() | |
bound_checker.interaction = interaction | |
return bound_checker | |
def proxy_factory(value, interaction): | |
return Proxy(value, _select_checker(value, interaction)) | |
class Checker(object): | |
implements(INameBasedChecker) | |
def __init__(self, get_permissions, set_permissions=None): | |
"""Create a checker | |
A dictionary must be provided for computing permissions for | |
names. The dictionary get will be called with attribute names | |
and must return a permission id, None, or the special marker, | |
CheckerPublic. If None is returned, then access to the name is | |
forbidden. If CheckerPublic is returned, then access will be | |
granted without checking a permission. | |
An optional setattr dictionary may be provided for checking | |
set attribute access. | |
""" | |
assert isinstance(get_permissions, dict) | |
self.get_permissions = get_permissions | |
if set_permissions is not None: | |
assert isinstance(set_permissions, dict) | |
self.set_permissions = set_permissions | |
self.interaction = None | |
def permission_id(self, name): | |
'See INameBasedChecker' | |
return self.get_permissions.get(name) | |
def setattr_permission_id(self, name): | |
'See INameBasedChecker' | |
if self.set_permissions: | |
return self.set_permissions.get(name) | |
def check_getattr(self, object, name): | |
'See IChecker' | |
self.check(object, name) | |
def check_setattr(self, object, name): | |
'See IChecker' | |
if self.set_permissions: | |
permission = self.set_permissions.get(name) | |
else: | |
permission = None | |
if permission is not None: | |
if permission is CheckerPublic: | |
return # Public | |
if self.interaction.checkPermission(permission, object): # use local interaction | |
return # allowed | |
else: | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise Unauthorized(object, name, permission) | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise ForbiddenAttribute(name, object) | |
def check(self, object, name): | |
'See IChecker' | |
permission = self.get_permissions.get(name) | |
if permission is not None: | |
if permission is CheckerPublic: | |
return # Public | |
if self.interaction.checkPermission(permission, object): # use local interaction | |
return | |
else: | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise Unauthorized(object, name, permission) | |
elif name in _available_by_default: | |
return | |
if name != '__iter__' or hasattr(object, name): | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise ForbiddenAttribute(name, object) | |
def proxy(self, value): | |
'See IChecker' | |
if type(value) is Proxy: | |
return value | |
checker = getattr(value, '__Security_checker__', None) | |
if checker is None: | |
checker = _select_checker(value, self.interaction) # pass interaction | |
if checker is None: | |
return value | |
return Proxy(value, checker) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from grokcore.security import Permission, name, title, description | |
class Read(Permission): | |
name('read') | |
class Modify(Permission): | |
name('modify') | |
class Create(Permission): | |
name('create') | |
class Add(Permission): | |
name('add') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from nose.tools import eq_, assert_raises | |
from zope.authentication.interfaces import IAuthentication | |
from zope.component import provideUtility, getUtility | |
from zope.interface import implements | |
from zope.security.interfaces import Unauthorized, ForbiddenAttribute | |
from zope.securitypolicy.principalpermission import principalPermissionManager as prinperG | |
from zope.securitypolicy.zopepolicy import ZopeSecurityPolicy | |
from opennode.oms.model.model.compute import Compute | |
from opennode.oms.security.checker import proxy_factory | |
from opennode.oms.security.principals import User | |
from opennode.oms.tests.util import run_in_reactor | |
class SessionStub(object): | |
def __init__(self, principal=None): | |
self.principal = principal | |
self.interaction = None | |
class DummyAuthenticationUtility: | |
implements(IAuthentication) | |
def getPrincipal(self, id): | |
if id == 'marko': | |
return User(id) | |
elif id == 'erik': | |
return User(id) | |
raise PrincipalLookupError(id) | |
provideUtility(DummyAuthenticationUtility()) | |
class SecurityTestCase(unittest.TestCase): | |
def _get_interaction(self, uid): | |
auth = getUtility(IAuthentication, context=None) | |
interaction = ZopeSecurityPolicy() | |
sess = SessionStub(auth.getPrincipal(uid)) | |
interaction.add(sess) | |
return interaction | |
@run_in_reactor | |
def test_test(self): | |
# setup some fake permissions to the test principals | |
prinperG.grantPermissionToPrincipal('read', 'marko') | |
prinperG.grantPermissionToPrincipal('zope.Nothing', 'erik') | |
# set up interactions | |
interaction_marko = self._get_interaction('marko') | |
interaction_erik = self._get_interaction('erik') | |
# get the object being secured | |
compute = Compute('linux', 'tux-for-test', 2000, 2000, 'active') | |
eq_(compute.architecture, 'linux') | |
# get the proxies for the corresponding interactions | |
compute_proxy_marko = proxy_factory(compute, interaction_marko) | |
compute_proxy_erik = proxy_factory(compute, interaction_erik) | |
# check an authorized access | |
eq_(compute_proxy_marko.architecture, 'linux') | |
# check an unauthorized access | |
with assert_raises(Unauthorized): | |
eq_(compute_proxy_erik.architecture, 'linux') | |
# check a default unauthorized access | |
with assert_raises(ForbiddenAttribute): | |
eq_(compute_proxy_marko.state, 'active') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Checker(object): | |
implements(INameBasedChecker) | |
def __init__(self, get_permissions, set_permissions=None): | |
"""Create a checker | |
A dictionary must be provided for computing permissions for | |
names. The dictionary get will be called with attribute names | |
and must return a permission id, None, or the special marker, | |
CheckerPublic. If None is returned, then access to the name is | |
forbidden. If CheckerPublic is returned, then access will be | |
granted without checking a permission. | |
An optional setattr dictionary may be provided for checking | |
set attribute access. | |
""" | |
assert isinstance(get_permissions, dict) | |
self.get_permissions = get_permissions | |
if set_permissions is not None: | |
assert isinstance(set_permissions, dict) | |
self.set_permissions = set_permissions | |
def permission_id(self, name): | |
'See INameBasedChecker' | |
return self.get_permissions.get(name) | |
def setattr_permission_id(self, name): | |
'See INameBasedChecker' | |
if self.set_permissions: | |
return self.set_permissions.get(name) | |
def check_getattr(self, object, name): | |
'See IChecker' | |
self.check(object, name) | |
def check_setattr(self, object, name): | |
'See IChecker' | |
if self.set_permissions: | |
permission = self.set_permissions.get(name) | |
else: | |
permission = None | |
if permission is not None: | |
if permission is CheckerPublic: | |
return # Public | |
if thread_local.interaction.checkPermission(permission, object): | |
return # allowed | |
else: | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise Unauthorized(object, name, permission) | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise ForbiddenAttribute(name, object) | |
def check(self, object, name): | |
'See IChecker' | |
permission = self.get_permissions.get(name) | |
if permission is not None: | |
if permission is CheckerPublic: | |
return # Public | |
if thread_local.interaction.checkPermission(permission, object): | |
return | |
else: | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise Unauthorized(object, name, permission) | |
elif name in _available_by_default: | |
return | |
if name != '__iter__' or hasattr(object, name): | |
__traceback_supplement__ = (TracebackSupplement, object) | |
raise ForbiddenAttribute(name, object) | |
def proxy(self, value): | |
'See IChecker' | |
if type(value) is Proxy: | |
return value | |
checker = getattr(value, '__Security_checker__', None) | |
if checker is None: | |
checker = selectChecker(value) | |
if checker is None: | |
return value | |
return Proxy(value, checker) | |
# ... | |
def selectChecker(object): | |
"""Get a checker for the given object | |
The appropriate checker is returned or None is returned. If the | |
return value is None, then object should not be wrapped in a proxy. | |
""" | |
# We need to be careful here. We might have a proxy, in which case | |
# we can't use the type. OTOH, we might not be able to use the | |
# __class__ either, since not everything has one. | |
# TODO: we really need formal proxy introspection | |
#if type(object) is Proxy: | |
# # Is this already a security proxy? | |
# return None | |
checker = _getChecker(type(object), _defaultChecker) | |
#checker = _getChecker(getattr(object, '__class__', type(object)), | |
# _defaultChecker) | |
if checker is NoProxy: | |
return None | |
while not isinstance(checker, Checker): | |
checker = checker(object) | |
if checker is NoProxy or checker is None: | |
return None | |
return checker |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment