Skip to content

Instantly share code, notes, and snippets.

@mkmik
Created August 24, 2011 11:08
Show Gist options
  • Save mkmik/1167801 to your computer and use it in GitHub Desktop.
Save mkmik/1167801 to your computer and use it in GitHub Desktop.
# ...
def setup_environ():
from grokcore.component.testing import grok
grok('opennode.oms.endpoint.ssh.cmd.grokkers') # XXX: Not sure why this needs to be explicit--an ordering issue?
grok('opennode.oms.security.grokkers') # XXX: Not sure why this needs to be explicit--an ordering issue?
grok('grokcore.security.meta') # invoke the PermissionGrokker which will register groksecurity permissions.
grok('opennode.oms')
--- /tmp/ch_zope.py 2011-08-24 13:18:43.580550455 +0200
+++ /tmp/ch_our.py 2011-08-24 13:20:48.900550408 +0200
@@ -21,6 +21,8 @@
assert isinstance(set_permissions, dict)
self.set_permissions = set_permissions
+ self.interaction = None
+
def permission_id(self, name):
'See INameBasedChecker'
return self.get_permissions.get(name)
@@ -44,7 +46,7 @@
if permission is not None:
if permission is CheckerPublic:
return # Public
- if thread_local.interaction.checkPermission(permission, object):
+ if self.interaction.checkPermission(permission, object): # use local interaction
return # allowed
else:
__traceback_supplement__ = (TracebackSupplement, object)
@@ -56,10 +58,10 @@
def check(self, object, name):
'See IChecker'
permission = self.get_permissions.get(name)
if permission is not None:
if permission is CheckerPublic:
return # Public
- if thread_local.interaction.checkPermission(permission, object):
+ if self.interaction.checkPermission(permission, object): # use local interaction
return
else:
__traceback_supplement__ = (TracebackSupplement, object)
@@ -77,7 +80,7 @@
return value
checker = getattr(value, '__Security_checker__', None)
if checker is None:
- checker = selectChecker(value)
+ checker = _select_checker(value, self.interaction) # pass interaction
if checker is None:
return value
# ...
class Compute(Model):
implements(ICompute, IAttributeAnnotatable)
permissions(dict(architecture='read'))
# ...
import martian
__all__ = ['permissions']
class permissions(martian.Directive):
"""Use this directive in a class in order to set it's attribute's permissions."""
scope = martian.CLASS
store = martian.ONCE
default = None
import grokcore.security
import martian
import zope.schema
from grokcore.security import require
from zope.interface import implementedBy
from zope.security.checker import defineChecker
from opennode.oms.model.model.base import Model, IModel
from opennode.oms.model.model.compute import Compute, ICompute
from opennode.oms.security.checker import Checker
from opennode.oms.security.directives import permissions
class SecurityGrokker(martian.ClassGrokker):
martian.component(object)
martian.directive(permissions, name='permissions')
def execute(self, factory, config, permissions, **kw):
if not permissions:
return False
# mandatory, otherwise zope's default Checker impl will be used
# which doesn't play well in async frameworks like twisted.
defineChecker(factory, Checker({},{}))
for name, permission in permissions.items():
config.action(
discriminator=('protectName', factory, name),
callable=grokcore.security.util.protect_getattr,
args=(factory, name, permission),
)
return True
from zope.interface import implements
from zope.security._proxy import _Proxy as Proxy
from zope.security.checker import _available_by_default, getCheckerForInstancesOf, CheckerPublic, TracebackSupplement
from zope.security.interfaces import INameBasedChecker, Unauthorized, ForbiddenAttribute
def _select_checker(value, interaction):
checker = getCheckerForInstancesOf(type(value))
# handle checkers for "primitive" types like str
if type(checker) is object:
return checker
# create a custom instance
bound_checker = type(checker).__new__(type(checker))
bound_checker.__dict__ = checker.__dict__.copy()
bound_checker.interaction = interaction
return bound_checker
def proxy_factory(value, interaction):
return Proxy(value, _select_checker(value, interaction))
class Checker(object):
implements(INameBasedChecker)
def __init__(self, get_permissions, set_permissions=None):
"""Create a checker
A dictionary must be provided for computing permissions for
names. The dictionary get will be called with attribute names
and must return a permission id, None, or the special marker,
CheckerPublic. If None is returned, then access to the name is
forbidden. If CheckerPublic is returned, then access will be
granted without checking a permission.
An optional setattr dictionary may be provided for checking
set attribute access.
"""
assert isinstance(get_permissions, dict)
self.get_permissions = get_permissions
if set_permissions is not None:
assert isinstance(set_permissions, dict)
self.set_permissions = set_permissions
self.interaction = None
def permission_id(self, name):
'See INameBasedChecker'
return self.get_permissions.get(name)
def setattr_permission_id(self, name):
'See INameBasedChecker'
if self.set_permissions:
return self.set_permissions.get(name)
def check_getattr(self, object, name):
'See IChecker'
self.check(object, name)
def check_setattr(self, object, name):
'See IChecker'
if self.set_permissions:
permission = self.set_permissions.get(name)
else:
permission = None
if permission is not None:
if permission is CheckerPublic:
return # Public
if self.interaction.checkPermission(permission, object): # use local interaction
return # allowed
else:
__traceback_supplement__ = (TracebackSupplement, object)
raise Unauthorized(object, name, permission)
__traceback_supplement__ = (TracebackSupplement, object)
raise ForbiddenAttribute(name, object)
def check(self, object, name):
'See IChecker'
permission = self.get_permissions.get(name)
if permission is not None:
if permission is CheckerPublic:
return # Public
if self.interaction.checkPermission(permission, object): # use local interaction
return
else:
__traceback_supplement__ = (TracebackSupplement, object)
raise Unauthorized(object, name, permission)
elif name in _available_by_default:
return
if name != '__iter__' or hasattr(object, name):
__traceback_supplement__ = (TracebackSupplement, object)
raise ForbiddenAttribute(name, object)
def proxy(self, value):
'See IChecker'
if type(value) is Proxy:
return value
checker = getattr(value, '__Security_checker__', None)
if checker is None:
checker = _select_checker(value, self.interaction) # pass interaction
if checker is None:
return value
return Proxy(value, checker)
from grokcore.security import Permission, name, title, description
class Read(Permission):
name('read')
class Modify(Permission):
name('modify')
class Create(Permission):
name('create')
class Add(Permission):
name('add')
import unittest
from nose.tools import eq_, assert_raises
from zope.authentication.interfaces import IAuthentication
from zope.component import provideUtility, getUtility
from zope.interface import implements
from zope.security.interfaces import Unauthorized, ForbiddenAttribute
from zope.securitypolicy.principalpermission import principalPermissionManager as prinperG
from zope.securitypolicy.zopepolicy import ZopeSecurityPolicy
from opennode.oms.model.model.compute import Compute
from opennode.oms.security.checker import proxy_factory
from opennode.oms.security.principals import User
from opennode.oms.tests.util import run_in_reactor
class SessionStub(object):
def __init__(self, principal=None):
self.principal = principal
self.interaction = None
class DummyAuthenticationUtility:
implements(IAuthentication)
def getPrincipal(self, id):
if id == 'marko':
return User(id)
elif id == 'erik':
return User(id)
raise PrincipalLookupError(id)
provideUtility(DummyAuthenticationUtility())
class SecurityTestCase(unittest.TestCase):
def _get_interaction(self, uid):
auth = getUtility(IAuthentication, context=None)
interaction = ZopeSecurityPolicy()
sess = SessionStub(auth.getPrincipal(uid))
interaction.add(sess)
return interaction
@run_in_reactor
def test_test(self):
# setup some fake permissions to the test principals
prinperG.grantPermissionToPrincipal('read', 'marko')
prinperG.grantPermissionToPrincipal('zope.Nothing', 'erik')
# set up interactions
interaction_marko = self._get_interaction('marko')
interaction_erik = self._get_interaction('erik')
# get the object being secured
compute = Compute('linux', 'tux-for-test', 2000, 2000, 'active')
eq_(compute.architecture, 'linux')
# get the proxies for the corresponding interactions
compute_proxy_marko = proxy_factory(compute, interaction_marko)
compute_proxy_erik = proxy_factory(compute, interaction_erik)
# check an authorized access
eq_(compute_proxy_marko.architecture, 'linux')
# check an unauthorized access
with assert_raises(Unauthorized):
eq_(compute_proxy_erik.architecture, 'linux')
# check a default unauthorized access
with assert_raises(ForbiddenAttribute):
eq_(compute_proxy_marko.state, 'active')
class Checker(object):
implements(INameBasedChecker)
def __init__(self, get_permissions, set_permissions=None):
"""Create a checker
A dictionary must be provided for computing permissions for
names. The dictionary get will be called with attribute names
and must return a permission id, None, or the special marker,
CheckerPublic. If None is returned, then access to the name is
forbidden. If CheckerPublic is returned, then access will be
granted without checking a permission.
An optional setattr dictionary may be provided for checking
set attribute access.
"""
assert isinstance(get_permissions, dict)
self.get_permissions = get_permissions
if set_permissions is not None:
assert isinstance(set_permissions, dict)
self.set_permissions = set_permissions
def permission_id(self, name):
'See INameBasedChecker'
return self.get_permissions.get(name)
def setattr_permission_id(self, name):
'See INameBasedChecker'
if self.set_permissions:
return self.set_permissions.get(name)
def check_getattr(self, object, name):
'See IChecker'
self.check(object, name)
def check_setattr(self, object, name):
'See IChecker'
if self.set_permissions:
permission = self.set_permissions.get(name)
else:
permission = None
if permission is not None:
if permission is CheckerPublic:
return # Public
if thread_local.interaction.checkPermission(permission, object):
return # allowed
else:
__traceback_supplement__ = (TracebackSupplement, object)
raise Unauthorized(object, name, permission)
__traceback_supplement__ = (TracebackSupplement, object)
raise ForbiddenAttribute(name, object)
def check(self, object, name):
'See IChecker'
permission = self.get_permissions.get(name)
if permission is not None:
if permission is CheckerPublic:
return # Public
if thread_local.interaction.checkPermission(permission, object):
return
else:
__traceback_supplement__ = (TracebackSupplement, object)
raise Unauthorized(object, name, permission)
elif name in _available_by_default:
return
if name != '__iter__' or hasattr(object, name):
__traceback_supplement__ = (TracebackSupplement, object)
raise ForbiddenAttribute(name, object)
def proxy(self, value):
'See IChecker'
if type(value) is Proxy:
return value
checker = getattr(value, '__Security_checker__', None)
if checker is None:
checker = selectChecker(value)
if checker is None:
return value
return Proxy(value, checker)
# ...
def selectChecker(object):
"""Get a checker for the given object
The appropriate checker is returned or None is returned. If the
return value is None, then object should not be wrapped in a proxy.
"""
# We need to be careful here. We might have a proxy, in which case
# we can't use the type. OTOH, we might not be able to use the
# __class__ either, since not everything has one.
# TODO: we really need formal proxy introspection
#if type(object) is Proxy:
# # Is this already a security proxy?
# return None
checker = _getChecker(type(object), _defaultChecker)
#checker = _getChecker(getattr(object, '__class__', type(object)),
# _defaultChecker)
if checker is NoProxy:
return None
while not isinstance(checker, Checker):
checker = checker(object)
if checker is NoProxy or checker is None:
return None
return checker
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment