Skip to content

Instantly share code, notes, and snippets.

@ept
Last active September 16, 2024 19:03
Show Gist options
  • Save ept/176b501f5f851a12432c2e2cf5ec886b to your computer and use it in GitHub Desktop.
Save ept/176b501f5f851a12432c2e2cf5ec886b to your computer and use it in GitHub Desktop.

Decentralised access control project

Set-up

Set up a Python virtual environment in a new, empty directory:

brew install python # or equivalent on your OS
python3 -m venv venv
source venv/bin/activate
pip install pynacl

Then save the file below as acl.py and run it using python acl.py. It should print out that it ran one test, and "OK".

How it works

The code uses the Ed25519 algorithm from the PyNaCl package for digital signatures, the SHA-256 algorithm from the Python standard library as hash function, and the unittest module from the Python standard library for test cases.

The create_op(), add_op(), and remove_op() functions generate signed operations to create a group, add a member to a group, and remove a member from a group respectively. These operations are organised into a hash graph as described in the lecture. See TestAccessControlList.test_add_remove() at the end of the file for an example of how to use these functions.

The interpret_ops() function takes a set of these access control operations and determines the set of public keys that are currently group members: that is, those keys that have been added, and not subsequently been removed again (the group creator is implicitly added). The function first checks all the signatures, and then checks that the hash graph is well-formed, raising an exception it if not.

The algorithm in interpret_ops() is currently a very simple one, in which only the group creator is allowed to add and remove group members. This avoids the problems discussed in the lecture, where two group members concurrently remove each other. But it's also a very restrictive model. Your task is to make the algorithm more flexible or powerful; how exactly you do this is left up to you.

What to do

Some suggestions (you don't have to do these in order, and feel free to do other things if you want):

  1. Explore the code and how it works by writing some more test cases. Include some tests of failure cases, e.g. an operation with an invalid signature, or an operation signed by an unauthorised key. You can use assertRaises() to check that a test case raises an expected exception.
  2. Integrate a notion of application messages into the algorithm. For example, you could make a chat room that only current members are allowed to post to. If a user is removed, any messages they posted while they were a member remain valid, but you should ignore any messages they post after they are removed or concurrently with their removal. You can do this by signing application messages and making them a part of the hash graph, just like access control operations.
  3. Change the access control algorithm to use the Matrix approach instead of the currently implemented one. That is, every user has a power level, a user is allowed to add other users at a power level less than or equal to their own, and a user is allowed to remove other users with a power level strictly less than their own. The group creator starts with a fixed power level, e.g. 100. What should happen if the same user is added multiple times with different power levels?
  4. Change the access control algorithm to use another conflict resolution approach, such as the seniority-based solution outlined in the lecture. You can find pseudocode in this draft paper.
  5. Instead of hand-writing test cases, you can also try property-based testing, which generates lots of random examples and checks that they produce the expected output. You could use the Hypothesis library for this.
  6. Implement a form of permission delegation: for example, you could have one group that represents a team of collaborators, and another group that represents the access control or a particular document or chat room. Instead of giving individual users access to the document or room, you could delegate access to the team, so that any member of the team can access the document/room, and the team membership can be managed separately. With this, you could even have a "team" representing all the devices belonging to one particular user; then a user could authorise and revoke devices without having to update the permissions on every document they have access to.
  7. A problem with the seniority-based access control algorithm is that a compromised key could be used long after it has been removed (even years later) to cause problems. Design an approach that would allow a removed key to become useless after some amount of time has elapsed.

Whatever you do, make sure that interpret_ops() always produces the same result regardless of the order in which it iterates over the set of operations. This is important to ensure convergence, i.e. that any two devices come to the same conclusion of who the group members are, regardless of the order in which they received the operations. Python randomises the iteration order of sets every time you start the Python process, so if you have a test that sometimes passes and sometimes fails, it could well be that you're accidentally relying on iteration order.

import json
import secrets
import unittest
from hashlib import sha256
from nacl.signing import SigningKey, VerifyKey
def hex_hash(byte_str):
"""Returns the SHA-256 hash of byte string ``byte_str``, encoded as hexadecimal."""
return sha256(byte_str).hexdigest()
def sign_msg(signing_key, message):
"""
Takes a message ``message`` (given as a dictionary), JSON-encodes it, and signs it using
``signing_key``. Returns a byte string in which the first 32 bytes are the public key, the next
64 bytes are the signature, and the rest is the JSON-encoded message.
"""
public_key = signing_key.verify_key.encode()
signed_msg = signing_key.sign(json.dumps(message).encode('utf-8'))
return public_key + signed_msg
def verify_msg(signed_msg):
"""
Takes a byte string of the form generated by ``sign_msg()``, checks the validity of the
signature, and parses the JSON into a dictionary. The public key that signed the message is
added to the dictionary under the key ``signed_by`` (as a hexadecimal string). Raises an
exception if the signature is not valid.
"""
public_key = VerifyKey(signed_msg[0:32]) # first 32 bytes are pubkey
verified = json.loads(public_key.verify(signed_msg[32:]))
return {**verified, 'signed_by': signed_msg[0:32].hex()}
def create_op(signing_key):
"""Returns a group creation operation signed by ``signing_key``."""
return sign_msg(signing_key, {'type': 'create', 'nonce': secrets.token_hex(16)})
def add_op(signing_key, added_key, preds):
"""Returns an operation signed by ``signing_key``, which adds ``added_key`` to the group.
``preds`` is a list of hashes of immediate predecessor operations."""
return sign_msg(signing_key, {'type': 'add', 'added_key': added_key, 'preds': preds})
def remove_op(signing_key, removed_key, preds):
"""Returns an operation signed by ``signing_key``, which removes ``removed_key`` from the group.
``preds`` is a list of hashes of immediate predecessor operations."""
return sign_msg(signing_key, {'type': 'remove', 'removed_key': removed_key, 'preds': preds})
def transitive_succs(successors, hash):
"""
Takes ``successors``, a dictionary from operation hashes to sets of successor hashes, and a
``hash`` to start from. Returns a set of all hashes that are reachable from that starting point.
"""
result = {hash}
for succ in successors.get(hash, []):
result.update(transitive_succs(successors, succ))
return result
def interpret_ops(ops):
"""
Takes a set of access control operations and computes the currently authorised set of users.
Throws an exception if something isn't right.
"""
# Check all the signatures and parse all the JSON
ops_by_hash = {hex_hash(op): verify_msg(op) for op in ops}
parsed_ops = ops_by_hash.values()
# Every op must be one of the expected types
if any(op['type'] not in {'create', 'add', 'remove'} for op in parsed_ops):
raise Exception('Every op must be either create, add, or remove')
if any('added_key' not in op for op in parsed_ops if op['type'] == 'add'):
raise Exception('Every add operation must have an added_key')
if any('removed_key' not in op for op in parsed_ops if op['type'] == 'remove'):
raise Exception('Every remove operation must have a removed_key')
# Hash graph integrity: every op except the initial creation must reference at least one
# predecessor operation, and all predecessors must exist in the set
if any(len(op['preds']) == 0 for op in parsed_ops if op['type'] != 'create'):
raise Exception('Every non-create op must have at least one predecessor')
if any(pred not in ops_by_hash
for op in parsed_ops if op['type'] != 'create'
for pred in op['preds']):
raise Exception('Every hash must resolve to another op in the set')
# Get the set of successor hashes for each op
successors = {}
for hash, op in ops_by_hash.items():
for pred in op.get('preds', []):
successors[pred] = successors.get(pred, set()) | {hash}
# Get the public key of the group creator
create_ops = [(hash, op) for hash, op in ops_by_hash.items() if op['type'] == 'create']
if len(create_ops) != 1:
raise Exception('There must be exactly one create operation')
create_hash, create_op = create_ops[0]
# Only the group creator may sign add/remove ops (TODO: change this!)
if any(op['signed_by'] != create_op['signed_by'] for op in parsed_ops):
raise Exception('Only the group creator may sign add/remove operations')
# Current group members are those who have been added, and not removed again by a remove
# operation that is a transitive successor to the add operation.
members = set()
for hash, op in ops_by_hash.items():
if op['type'] in {'create', 'add'}:
added_key = op['signed_by'] if op['type'] == 'create' else op['added_key']
succs = [ops_by_hash[succ] for succ in transitive_succs(successors, hash)]
if not any(succ['type'] == 'remove' and succ['removed_key'] == added_key
for succ in succs):
members.add(added_key)
return members
class TestAccessControlList(unittest.TestCase):
# Generate keys for all the participants
private = {name: SigningKey.generate() for name in {'alice', 'bob', 'carol', 'dave'}}
public = {name: key.verify_key.encode().hex() for name, key in private.items()}
friendly_name = {public_key: name for name, public_key in public.items()}
def test_add_remove(self):
# Make some example ops
create = create_op(self.private['alice'])
add_b = add_op(self.private['alice'], self.public['bob'], [hex_hash(create)])
add_c = add_op(self.private['alice'], self.public['carol'], [hex_hash(create)])
rem_b = remove_op(self.private['alice'], self.public['bob'], [hex_hash(add_b), hex_hash(add_c)])
# Compute group membership
members = interpret_ops({create, add_b, add_c, rem_b})
self.assertEqual({self.friendly_name[member] for member in members}, {'alice', 'carol'})
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment