Last active
September 19, 2024 12:38
-
-
Save Natim/77c8f61c1d42e476cef8 to your computer and use it in GitHub Desktop.
How to handle our permission schema with Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
import json | |
import redis | |
r = redis.StrictRedis(host='localhost', port=6379, db=1) | |
# List of permissions that gives us this permission | |
PERMISSIONS_INHERITANCE = { | |
'bucket:write': { | |
'bucket': ['write'] | |
}, | |
'bucket:read': { | |
'bucket': ['write', 'read'] | |
}, | |
'groups:create': { | |
'bucket': ['write', 'groups:create'] | |
}, | |
'collections:create': { | |
'bucket': ['write', 'collections:create'] | |
}, | |
'group:write': { | |
'bucket': ['write'], | |
'group': ['write'] | |
}, | |
'group:read': { | |
'bucket': ['write', 'read'], | |
'group': ['write', 'read'] | |
}, | |
'collection:write': { | |
'bucket': ['write'], | |
'collection': ['write'], | |
}, | |
'collection:read': { | |
'bucket': ['write', 'read'], | |
'collection': ['write', 'read'], | |
}, | |
'records:create': { | |
'bucket': ['write'], | |
'collection': ['write', 'records:create'] | |
}, | |
'record:write': { | |
'bucket': ['write'], | |
'collection': ['write'], | |
'record': ['write'] | |
}, | |
'record:read': { | |
'bucket': ['write', 'read'], | |
'collection': ['write', 'read'], | |
'record': ['write', 'read'] | |
} | |
} | |
def add_to_group(principal, group): | |
"""Add the principal to the given group.""" | |
r.sadd('principal:%s' % group, principal) | |
r.sadd('principal:%s' % principal, group) | |
def remove_from_group(principal, group): | |
"""Remove the principal from the given group.""" | |
r.srem('principal:%s' % group, principal) | |
r.srem('principal:%s' % principal, group) | |
def get_user_principals(user_id): | |
"""Return the list of principals for the given user.""" | |
members = r.smembers('principal:%s' % user_id) | |
return members | set([user_id]) | |
def build_perm_set_id(obj_type, perm, obj_parts): | |
PARTS_LENGTH = { | |
'bucket': 3, | |
'collection': 5, | |
'record': 7 | |
} | |
return 'permission:%s:%s' % ( | |
'/'.join(obj_parts[:PARTS_LENGTH[obj_type]]), | |
perm | |
) | |
def get_obj_type_from_key(obj_key): | |
if 'records' in obj_key: | |
obj_type = 'record' | |
elif 'collections' in obj_key: | |
obj_type = 'collection' | |
elif 'buckets' in obj_key: | |
obj_type = 'bucket' | |
else: | |
raise ValueError('%s key is invalid' % obj_key) | |
return obj_type | |
def get_perm_keys(permission): | |
obj_key, perm = permission.split(':', 1) | |
obj_parts = obj_key.split('/') | |
obj_type = get_obj_type_from_key(obj_key) | |
perm_type = '%s:%s' % (obj_type, perm) | |
keys = set([]) | |
for perm_obj_type, permissions in \ | |
PERMISSIONS_INHERITANCE[perm_type].items(): | |
for other_perm in permissions: | |
keys.add(build_perm_set_id(perm_obj_type, other_perm, obj_parts)) | |
return keys | |
def get_user_permission(user_id, permission): | |
"""Give the full list of principals having this permission.""" | |
keys = get_perm_keys(permission) | |
multi = r.pipeline() | |
multi.sadd('principal:%s' % user_id, user_id) | |
multi.sunionstore('temp_perm:%s' % permission, *list(keys)) | |
multi.sinter('temp_perm:%s' % permission, 'principal:%s' % user_id) | |
multi.delete('temp_perm:%s' % permission) | |
results = multi.execute() | |
return results[2] | |
def has_permission(user_id, permission): | |
"""Return a boolean telling if the user have got the permission.""" | |
members = get_user_permission(user_id, permission) | |
return len(members) > 0 | |
def add_permission(principal, permission): | |
"""Add the permission to the principal. | |
Print the list of redis commands to run in order to achieve the | |
action.""" | |
r.sadd('permission:%s' % permission, principal) | |
def remove_permission(principal, permission): | |
"""Remove the permission to this principal on this bucket.""" | |
r.srem('permission:%s' % permission, principal) | |
def get_object_permission(obj_key): | |
"""Display the permission added to this given object.""" | |
results = r.keys('permission:%s:*' % obj_key) | |
multi = r.pipeline() | |
for result in results: | |
multi.smembers(result) | |
members = multi.execute() | |
permissions = {} | |
for i, key in enumerate(results): | |
perm = key.split(':', 2)[-1] | |
permissions[perm] = members[i] | |
return permissions | |
def get_object_principals(obj_key): | |
"""Give the full list of principals permission.""" | |
permissions = {} | |
permissions['read'] = r.sunion(*list(get_perm_keys('%s:read' % obj_key))) | |
permissions['write'] = r.sunion(*list(get_perm_keys('%s:write' % obj_key))) | |
return permissions | |
def get_permission_principals(permission): | |
return r.sunion(*list(get_perm_keys(permission))) | |
def set_object(obj_key, data=None): | |
"""Simulate the add of a object or a subobject""" | |
# Add the object | |
# Build the permission set for this object and add principal for upper sets | |
obj_type = get_obj_type_from_key(obj_key) | |
obj_data = { | |
"id": obj_key, | |
"type": obj_type, | |
"data": data or {}, | |
} | |
r.set('object:%s' % obj_key, json.dumps(obj_data)) | |
def del_object(obj_key): | |
"""Simulate the removal of an object and its subobject""" | |
# Remove the object | |
multi = r.pipeline() | |
multi.delete('object:%s' % obj_key) | |
# Remove the sub-objects permissions sets | |
cursor, results = r.scan(0, '*:%s/*' % obj_key) | |
while True: | |
for result in results: | |
multi.delete(result) | |
if cursor == 0: | |
break | |
else: | |
cursor, results = r.scan(cursor, '*:%s/*' % obj_key) | |
# Remove permissions sets | |
cursor, results = r.scan(0, '*:%s:*' % obj_key) | |
while True: | |
for result in results: | |
multi.delete(result) | |
if cursor == 0: | |
break | |
else: | |
cursor, results = r.scan(cursor, '*:%s:*' % obj_key) | |
multi.execute() | |
if __name__ == '__main__': | |
user_id = "/users/natim" | |
user2_id = "/users/alexis" | |
group_id = "/buckets/blog/groups/moderators" | |
bucket_id = "/buckets/blog" | |
collection_id = "/buckets/blog/collections/articles" | |
collection2_id = "/buckets/blog/collections/comments" | |
record_id = "/buckets/blog/collections/articles/records/yo" | |
record2_id = "/buckets/blog/collections/comments/records/foobar" | |
# See what's happen with groups | |
add_to_group(user_id, group_id) | |
print(group_id, get_user_principals(user_id)) | |
# Create some feature | |
set_object(bucket_id) | |
set_object(collection_id) | |
set_object(collection2_id) | |
set_object(record_id) | |
set_object(record2_id) | |
# Add an admin to the bucket | |
add_permission(user_id, '%s:write' % bucket_id) | |
print("add admin to the bucket", record2_id, | |
get_object_principals(record2_id)) | |
# Add moderator to the articles and comments | |
add_permission(group_id, '%s:write' % collection_id) | |
add_permission(group_id, '%s:write' % collection2_id) | |
print("add group moderator", record2_id, get_object_principals(record2_id)) | |
# Add alexis as author of a comment | |
add_permission(user2_id, '%s:write' % record2_id) | |
print("add alexis", record2_id, get_object_principals(record2_id)) | |
# Remove the user from the group | |
remove_from_group(user_id, group_id) | |
print(get_user_principals(user_id)) | |
# Can alexis edit the article? | |
print("Can alexis edit the article?", get_user_principals(user2_id) & get_permission_principals('%s:write' % record_id), has_permission(user2_id, '%s:write' % record_id)) | |
print("Add alexis to the moderator group") | |
add_to_group(user2_id, group_id) | |
print("Can alexis edit the article?", get_user_principals(user2_id) & get_permission_principals('%s:write' % record_id), has_permission(user2_id, '%s:write' % record_id)) | |
# Remove the bucket | |
del_object(bucket_id) | |
print(r.keys()) | |
r.flushdb() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment