Skip to content

Instantly share code, notes, and snippets.

@LukeB42
Last active March 7, 2026 23:06
Show Gist options
  • Select an option

  • Save LukeB42/deb887691f13dee9ca160c809b93ff94 to your computer and use it in GitHub Desktop.

Select an option

Save LukeB42/deb887691f13dee9ca160c809b93ff94 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
law.py — Decentralised Legislative System over a DHT
=====================================================
A single-file implementation of a fully decentralised legislative system
where laws are proposed, drafted, and voted into existence by citizens
holding cryptographic identity keys issued by a birth-certificate authority.
OVERVIEW
--------
Laws move through three phases:
intent-proposal -> open-drafting -> open-voting -> enacted
-> repealed (via amendment)
Amendments and repeals follow the same pipeline but reference an existing
enacted law. A competing amendment that enacts first voids all other active
proposals targeting the same law.
PHASES IN DETAIL
----------------
intent-proposal:
A citizen submits a title + initial body (or amendment body + commentary).
Other citizens cast intent-votes to signal the idea has enough support to
be worth drafting. If intent_threshold votes are not reached within
intent_window_days the proposal record is deleted from the network entirely
along with its blob — dead ideas cost nothing to store. Draft laws never
die this way; only failed intents are pruned.
open-drafting:
Citizens submit revised draft versions of the proposal text. Other citizens
nominate a specific draft version as the candidate by casting nominate-votes.
The first version to reach draft_threshold nominations becomes the candidate
and the proposal advances to open-voting. Draft blobs that lose are TTL'd.
There is no deadline on drafting — a proposal stays here until a version
wins enough nominations. It can only be killed by repeal of its target law
(for amendments) or explicit abandonment by the original proposer.
open-voting:
Citizens vote for or against the candidate version. If it reaches 51% of
the total electorate (as reported by the signed authority voter-count server)
it is enacted. The enacted blob is permanent. All other phase blobs for this
proposal are TTL'd.
enacted:
The law is live. law:<id> points to the enacted blob hash. History of all
prior enacted hashes is preserved in the record for auditability.
voided:
A competing amendment enacted first. The proposal record is marked voided
with a voided_by field pointing to the new enacted hash. Blobs are TTL'd.
AMENDMENT & REPEAL
------------------
Amendment proposals must supply --law-id (the law being amended), --text
(the full new text of the law, i.e. the original with modifications), and
--commentary (free-text explanation of what changed and why). The proposal
stores the baseline_hash — the enacted hash of the law at proposal time —
so anyone can diff the original against the proposed change.
When an amendment enacts, all other active proposals (intent, drafting, or
voting phase) targeting the same law-id are immediately voided.
Repeal is a special amendment whose enacted text is the string "REPEALED".
The law:<id> record transitions to phase "repealed" and its blob is TTL'd.
The record stub is kept forever so the ID is never recycled.
DATA MODEL
----------
proposal:<id> -> {
type: "original" | "amendment" | "repeal",
amends: "<law-id>", # amendment/repeal only
baseline_hash: "<hash>", # enacted hash at proposal time
title: "...",
body_hash: "<hash>", # blob:<hash> = initial body text
commentary: "...", # amendment/repeal only
phase: "intent" | "drafting" | "voting" | "enacted"
| "voided" | "abandoned",
voided_by: "<enacted_hash>", # set on void
intent_votes: [ <vote>, ... ],
intent_threshold: N,
intent_deadline: <unix>,
expires_at: <unix>, # TTL for intent-phase failures
draft_versions: [ <hash>, ... ],
draft_votes: { <hash>: [<vote>, ...] },
draft_candidate: "<hash>",
draft_threshold: N,
voting_votes: [ <vote>, ... ],
enacted_hash: "<hash>",
submitted_at: <unix>,
submitted_by: "<citizen_id>",
}
law:<id> -> {
phase: "enacted" | "repealed",
enacted_hash: "<hash>", # current enacted blob
history: [ "<hash>", ... ], # all prior enacted hashes
proposal_id: "<proposal-id>", # proposal that enacted this version
enacted_at: <unix>,
}
blob:<hash> -> JSON string of { content, submitted_at }
Permanent for enacted blobs. TTL'd for all others.
authority_pubkey -> authority Ed25519 public key PEM
voter_count -> { host, port } of the authority voter-count server
BLOB TTL / FORGETTING
---------------------
Each node runs a periodic sweep (every 60 seconds) that removes any stored
key whose associated record has expired or been voided. The rules are:
- Failed intent proposals: deleted after expires_at (intent_deadline + grace)
- Losing draft versions: blob deleted when draft_candidate is set elsewhere
- Voided proposal blobs: deleted immediately on void
- Repealed law blobs: deleted after a grace period post-enactment
- Enacted blobs: never deleted
New nodes joining the network therefore never download dead-end data unless
they explicitly request historical blobs via `client drafts` or `client history`.
A `client read` fetches only the current enacted blob for a law.
AUTHORITY VOTER-COUNT SERVER
-----------------------------
The authority runs a long-lived TCP server that responds to every connection
with a freshly signed JSON payload:
{
"total_voters": <int>,
"intent_threshold": <int>,
"intent_window_days": <int>,
"draft_threshold": <int>,
"timestamp": <unix>,
"signature": "<base64 Ed25519 sig>"
}
The count is read from COUNTFILE on every request so it is always current.
Every `authority issue` atomically increments COUNTFILE.
Thresholds are baked into this payload so the whole network shares one
signed source of truth for what "enough support" means.
SCALING: SMALL TEAM TO ENTIRE COUNTRY
--------------------------------------
The system is designed to work identically at any scale. The only difference
is in the authority-server configuration:
Small team (3-10 people):
intent_threshold: 1 # one person can move a proposal to drafting
intent_window_days: 1 # ideas must get support within a day
draft_threshold: 2 # two nominations move a draft to voting
total_voters: ~10
Nodes: 1-3 DHT nodes on local network or localhost
Organisation (10-1000 people):
intent_threshold: 3-10
intent_window_days: 3-7
draft_threshold: 10-50
total_voters: ~hundreds
Nodes: 5-20 DHT nodes, possibly across a VPN or LAN
City / region (10k-10M people):
intent_threshold: 100-1000 (or ~0.01% of electorate)
intent_window_days: 7-30
draft_threshold: 1000+ (or ~0.1% of electorate)
total_voters: millions
Nodes: Thousands of always-on nodes. Citizens run personal
nodes or connect via a community node. The DHT
naturally shards data across the network — no node
stores everything.
Entire country (tens of millions):
intent_threshold: ~10,000 (or ~0.05% of electorate)
intent_window_days: 30
draft_threshold: ~50,000 (or ~0.1% of electorate)
total_voters: tens of millions
Nodes: Hundreds of thousands of nodes. The Kademlia DHT
scales sub-linearly: routing tables stay small (O(log N)
buckets), replication stays at K=3 closest nodes per key,
and each node only stores a small slice of the keyspace.
The authority voter-count server should be replicated
behind a load balancer or replaced with a small committee
of co-signers for fault tolerance. Key issuance can be
distributed to regional offices each holding a delegated
signing key certified by the root authority.
The network's TTL-based garbage collection becomes increasingly important at
scale. A country-sized network could have millions of failed intent proposals
per year. With TTLs, each node's storage footprint is bounded by the number
of active proposals, not the total historical proposal count. Only enacted law
blobs are permanent and replicated forever.
QUICK START
-----------
# 1. Create the authority keypair (done once by the issuing body)
python law.py authority genkey --out authority.json
# 2. Issue citizen keys — each issuance atomically increments voters.count
python law.py authority issue --authority authority.json --id "alice" --out alice.json --countfile voters.count
python law.py authority issue --authority authority.json --id "bob" --out bob.json --countfile voters.count
python law.py authority issue --authority authority.json --id "carol" --out carol.json --countfile voters.count
# 3. Start DHT nodes (each in a separate terminal)
python law.py node --port 7001
python law.py node --port 7002 --bootstrap 127.0.0.1:7001
python law.py node --port 7003 --bootstrap 127.0.0.1:7001
# 4. Start the authority voter-count + threshold server (long-running)
python law.py authority publish-voter-count \\
--authority authority.json --countfile voters.count \\
--intent-threshold 2 --intent-window-days 7 --draft-threshold 2 \\
--port 7100
# 5. Publish authority pubkey + voter-count server address to the DHT
python law.py client publish-authority \\
--authority authority.json --vc-server 127.0.0.1:7100 --node 127.0.0.1:7001
# --- ORIGINAL LAW WORKFLOW ---
# 6. Propose a new law (intent phase)
python law.py client propose \\
--id "law-1" --title "Ban Mondays" \\
--text "All Mondays are hereby banned. Citizens shall not be compelled to attend meetings before 10am." \\
--citizen alice.json --node 127.0.0.1:7001
# 7. Cast intent votes to advance proposal to drafting
python law.py client intent-vote --id "law-1" --citizen bob.json --node 127.0.0.1:7001
python law.py client intent-vote --id "law-1" --citizen carol.json --node 127.0.0.1:7002
# 8. Submit a draft version
python law.py client draft \\
--id "law-1" \\
--text "All Mondays are hereby optional. No meetings before 10am. Violations carry a fine." \\
--citizen alice.json --node 127.0.0.1:7001
# 9. Nominate a draft version as the voting candidate (shows draft hashes)
python law.py client status --id "law-1" --node 127.0.0.1:7001
python law.py client nominate --id "law-1" --hash <draft_hash> --citizen bob.json --node 127.0.0.1:7001
python law.py client nominate --id "law-1" --hash <draft_hash> --citizen carol.json --node 127.0.0.1:7002
# 10. Vote on the candidate in open-voting
python law.py client vote --id "law-1" --citizen alice.json --node 127.0.0.1:7001
python law.py client vote --id "law-1" --citizen bob.json --node 127.0.0.1:7002
# 11. Read the enacted law
python law.py client read --id "law-1" --node 127.0.0.1:7001
# --- AMENDMENT WORKFLOW ---
# 12. Propose an amendment (intent phase)
python law.py client amend \\
--law-id "law-1" --id "law-1-amend-1" \\
--title "Extend Monday ban to Tuesdays" \\
--text "All Mondays and Tuesdays are hereby optional. No meetings before 10am." \\
--commentary "The original ban did not address the Tuesday problem. This amendment extends coverage." \\
--citizen alice.json --node 127.0.0.1:7001
# 13. Follow same intent-vote -> draft -> nominate -> vote pipeline as above
# --- REPEAL WORKFLOW ---
# 14. Propose a repeal
python law.py client repeal \\
--law-id "law-1" --id "law-1-repeal" \\
--title "Repeal the Monday ban" \\
--commentary "Mondays are fine actually." \\
--citizen carol.json --node 127.0.0.1:7001
# --- INSPECTION ---
python law.py client status --id "law-1" --node 127.0.0.1:7001
python law.py client proposals --node 127.0.0.1:7001
python law.py client drafts --id "law-1" --node 127.0.0.1:7001
python law.py client history --id "law-1" --node 127.0.0.1:7001
DEPENDENCIES
------------
pip install cryptography
"""
import argparse
import hashlib
import json
import socket
import sys
import threading
import time
from base64 import b64decode, b64encode
try:
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
load_pem_private_key,
load_pem_public_key,
)
from cryptography.exceptions import InvalidSignature
except ImportError:
sys.exit("Missing dependency — run: pip install cryptography")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
K = 3 # DHT replication factor
TIMEOUT = 4 # Socket timeout seconds
MAX_MSG = 262144 # 256 KB max message
TTL_SWEEP_INTERVAL = 60 # seconds between TTL sweep passes
INTENT_GRACE_SECS = 3600 # 1 hour grace after intent deadline before pruning
# ===========================================================================
# Crypto helpers
# ===========================================================================
def _priv_to_pem(key: Ed25519PrivateKey) -> str:
return key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode()
def _pub_to_pem(key) -> str:
return key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode()
def _load_priv(pem: str) -> Ed25519PrivateKey:
return load_pem_private_key(pem.encode(), password=None)
def _load_pub(pem: str) -> Ed25519PublicKey:
return load_pem_public_key(pem.encode())
def _sign(private_pem: str, data: bytes) -> str:
return b64encode(_load_priv(private_pem).sign(data)).decode()
def _verify(public_pem: str, data: bytes, sig_b64: str) -> bool:
try:
_load_pub(public_pem).verify(b64decode(sig_b64), data)
return True
except Exception:
return False
def _sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _node_id_from_port(port: int) -> int:
return int(_sha256(str(port).encode())[:8], 16)
def _key_id(key: str) -> int:
return int(_sha256(key.encode())[:8], 16)
def _xor_dist(a: int, b: int) -> int:
return a ^ b
# ===========================================================================
# Countfile helpers (atomic integer stored in a plain text file)
# ===========================================================================
def _countfile_read(path: str) -> int:
try:
with open(path) as f:
return int(f.read().strip())
except (FileNotFoundError, ValueError):
return 0
def _countfile_increment(path: str) -> int:
import fcntl
lock_path = path + ".lock"
with open(lock_path, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX)
try:
count = _countfile_read(path) + 1
with open(path, "w") as f:
f.write(str(count))
return count
finally:
fcntl.flock(lf, fcntl.LOCK_UN)
# ===========================================================================
# DHT Node
# ===========================================================================
class DHTNode:
"""
Simplified Kademlia-inspired DHT node.
Store keys:
"proposal:<id>" -> proposal record dict
"law:<id>" -> enacted law record dict
"blob:<hash>" -> raw content JSON string (may carry expires_at)
"authority_pubkey" -> authority Ed25519 public key PEM
"voter_count" -> { host, port } of authority voter-count server
"""
def __init__(self, port: int, bootstrap: str = None):
self.port = port
self.node_id = _node_id_from_port(port)
self.store = {} # key -> value
self.ttls = {} # key -> expires_at unix timestamp
self.peers = {} # node_id -> (host, port)
self.lock = threading.Lock()
self._running = False
if bootstrap:
host, p = bootstrap.split(":")
pid = _node_id_from_port(int(p))
self.peers[pid] = (host, int(p))
# -------------------------------------------------------------------------
# Lifecycle
# -------------------------------------------------------------------------
def start(self):
self._running = True
threading.Thread(target=self._serve, daemon=True).start()
threading.Thread(target=self._ttl_sweep, daemon=True).start()
time.sleep(0.1)
if self.peers:
self._bootstrap()
print(f"[node:{self.port}] Started. ID={self.node_id:#010x} peers={len(self.peers)}")
def _ttl_sweep(self):
"""Periodically remove expired keys from local store."""
while self._running:
time.sleep(TTL_SWEEP_INTERVAL)
now = time.time()
with self.lock:
expired = [k for k, exp in self.ttls.items() if now > exp]
for k in expired:
self.store.pop(k, None)
del self.ttls[k]
if expired:
print(f"[node:{self.port}] TTL sweep removed {len(expired)} expired key(s).")
# -------------------------------------------------------------------------
# Server
# -------------------------------------------------------------------------
def _serve(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", self.port))
srv.listen(32)
srv.settimeout(1.0)
while self._running:
try:
conn, _ = srv.accept()
threading.Thread(target=self._handle, args=(conn,), daemon=True).start()
except socket.timeout:
continue
def _handle(self, conn):
try:
conn.settimeout(TIMEOUT)
raw = b""
while len(raw) < MAX_MSG:
chunk = conn.recv(4096)
if not chunk:
break
raw += chunk
if raw.endswith(b"\n"):
break
if raw.strip():
resp = self._dispatch(json.loads(raw.decode()))
conn.sendall((json.dumps(resp) + "\n").encode())
except Exception as e:
try:
conn.sendall((json.dumps({"error": str(e)}) + "\n").encode())
except Exception:
pass
finally:
conn.close()
def _dispatch(self, msg: dict) -> dict:
cmd = msg.get("cmd")
if cmd == "ping":
self._register_peer(msg.get("sender_id"), msg.get("sender_host"), msg.get("sender_port"))
return {"ok": True, "node_id": self.node_id}
elif cmd == "get_peers":
return {"peers": [[pid, h, p] for pid, (h, p) in self.peers.items()]}
elif cmd == "find_node":
return {"peers": self._closest_peers(msg["target_id"], exclude=msg.get("sender_id"))}
elif cmd == "store":
self._local_store(msg["key"], msg["value"], ttl=msg.get("ttl"))
return {"ok": True}
elif cmd == "find_value":
key = msg["key"]
with self.lock:
if key in self.store:
return {"found": True, "value": self.store[key]}
return {"found": False, "peers": self._closest_peers(_key_id(key))}
elif cmd == "delete":
with self.lock:
self.store.pop(msg["key"], None)
self.ttls.pop(msg["key"], None)
return {"ok": True}
else:
return {"error": f"unknown command: {cmd}"}
# -------------------------------------------------------------------------
# Peer management
# -------------------------------------------------------------------------
def _register_peer(self, node_id, host, port):
if None in (node_id, host, port) or node_id == self.node_id:
return
with self.lock:
self.peers[node_id] = (host, port)
def _closest_peers(self, target_id: int, exclude=None, n: int = K) -> list:
with self.lock:
ranked = sorted(
[(pid, h, p) for pid, (h, p) in self.peers.items() if pid != exclude],
key=lambda x: _xor_dist(x[0], target_id),
)
return [[pid, h, p] for pid, h, p in ranked[:n]]
def _bootstrap(self):
for pid, (h, p) in list(self.peers.items()):
resp = DHTNode.rpc(h, p, {
"cmd": "ping",
"sender_id": self.node_id,
"sender_host": "127.0.0.1",
"sender_port": self.port,
})
if resp and resp.get("ok"):
resp2 = DHTNode.rpc(h, p, {"cmd": "get_peers"})
if resp2 and "peers" in resp2:
for entry in resp2["peers"]:
ph, pp = entry[1], entry[2]
ppid = _node_id_from_port(pp)
if ppid != self.node_id:
with self.lock:
self.peers[ppid] = (ph, pp)
# -------------------------------------------------------------------------
# RPC
# -------------------------------------------------------------------------
@staticmethod
def rpc(host: str, port: int, msg: dict, timeout: int = TIMEOUT) -> dict:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((host, int(port)))
s.sendall((json.dumps(msg) + "\n").encode())
raw = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
raw += chunk
if raw.endswith(b"\n"):
break
return json.loads(raw.decode())
except Exception:
return None
# -------------------------------------------------------------------------
# DHT put / get / delete
# -------------------------------------------------------------------------
def _local_store(self, key: str, value, ttl: int = None):
with self.lock:
self.store[key] = value
if ttl is not None:
self.ttls[key] = time.time() + ttl
else:
self.ttls.pop(key, None)
def dht_put(self, key: str, value, ttl: int = None):
self._local_store(key, value, ttl=ttl)
target = _key_id(key)
closest = self._closest_peers(target, n=K)
for pid, h, p in closest:
DHTNode.rpc(h, p, {"cmd": "store", "key": key, "value": value, "ttl": ttl})
def dht_get(self, key: str):
with self.lock:
if key in self.store:
# Check not locally expired
if key not in self.ttls or time.time() < self.ttls[key]:
return self.store[key]
target = _key_id(key)
visited = set()
queue = self._closest_peers(target, n=K)
while queue:
pid, h, p = queue.pop(0)
if pid in visited:
continue
visited.add(pid)
resp = DHTNode.rpc(h, p, {"cmd": "find_value", "key": key})
if resp is None:
continue
if resp.get("found"):
self._local_store(key, resp["value"])
return resp["value"]
for peer in resp.get("peers", []):
if peer[0] not in visited:
queue.append(peer)
queue.sort(key=lambda x: _xor_dist(x[0], target))
return None
def dht_delete(self, key: str):
"""Remove a key from this node and broadcast deletion to peers."""
with self.lock:
self.store.pop(key, None)
self.ttls.pop(key, None)
target = _key_id(key)
closest = self._closest_peers(target, n=K)
for pid, h, p in closest:
DHTNode.rpc(h, p, {"cmd": "delete", "key": key})
# ===========================================================================
# DHT key helpers
# ===========================================================================
PROPOSAL_KEY = lambda pid: f"proposal:{pid}"
LAW_KEY = lambda lid: f"law:{lid}"
BLOB_KEY = lambda h: f"blob:{h}"
AUTH_KEY = "authority_pubkey"
VOTER_COUNT_KEY = "voter_count"
# ===========================================================================
# Authority voter-count + threshold server
# ===========================================================================
def query_voter_count_server(host: str, port: int) -> dict | None:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(TIMEOUT)
s.connect((host, int(port)))
s.sendall(b"ping\n")
raw = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
raw += chunk
if raw.endswith(b"\n"):
break
return json.loads(raw.decode())
except Exception:
return None
def _resolve_network_config(node: DHTNode, auth_pub: str) -> dict | None:
"""
Fetch and verify the signed authority config (voter count + thresholds).
Returns the verified payload dict or None on failure.
"""
vc_entry = node.dht_get(VOTER_COUNT_KEY)
if vc_entry is None or auth_pub is None:
return None
if "host" in vc_entry and "port" in vc_entry:
payload = query_voter_count_server(vc_entry["host"], vc_entry["port"])
if payload is None:
return None
else:
payload = vc_entry
body = {k: payload[k] for k in payload if k != "signature"}
body_bytes = json.dumps(body, sort_keys=True).encode()
if not _verify(auth_pub, body_bytes, payload["signature"]):
return None
return payload
# ===========================================================================
# Vote helper (shared across all voting actions)
# ===========================================================================
def _build_vote(action: str, target_id: str, target_hash: str,
citizen: dict, auth_pub: str) -> dict:
"""
Build and sign a vote payload. action is one of:
"intent" | "nominate" | "vote"
Verifies the citizen cert against auth_pub first.
"""
cert = citizen["cert"]
cert_sig = citizen["cert_sig"]
cert_bytes = json.dumps(cert, sort_keys=True).encode()
if not _verify(auth_pub, cert_bytes, cert_sig):
sys.exit("[vote] Citizen certificate fails authority verification!")
payload = {
"action": action,
"target_id": target_id,
"target_hash": target_hash,
"citizen_id": citizen["citizen_id"],
"pubkey_pem": citizen["public_pem"],
"cert": cert,
"cert_sig": cert_sig,
"timestamp": int(time.time()),
}
vote_bytes = json.dumps(payload, sort_keys=True).encode()
payload["sig"] = _sign(citizen["private_pem"], vote_bytes)
return payload
def _verify_vote(vote: dict, auth_pub: str) -> bool:
cb = json.dumps(vote["cert"], sort_keys=True).encode()
vb = json.dumps({k: vote[k] for k in vote if k != "sig"}, sort_keys=True).encode()
return (_verify(auth_pub, cb, vote["cert_sig"]) and
_verify(vote["pubkey_pem"], vb, vote["sig"]))
def _citizen_already_voted(votes: list, citizen_id: str) -> bool:
return any(v.get("citizen_id") == citizen_id for v in votes)
# ===========================================================================
# Phase transition helpers
# ===========================================================================
def _try_advance_intent(node: DHTNode, proposal: dict, cfg: dict) -> bool:
"""Advance to drafting if intent threshold reached. Returns True if advanced."""
if proposal["phase"] != "intent":
return False
threshold = proposal.get("intent_threshold", cfg.get("intent_threshold", 1))
if len(proposal["intent_votes"]) >= threshold:
proposal["phase"] = "drafting"
# Remove intent TTL — draft laws don't expire
node.dht_delete(BLOB_KEY(proposal["body_hash"])) # intent blob no longer needed
# Re-store blob permanently as first draft version
# (body_hash blob was already stored; just clear its TTL by re-putting without one)
node.dht_put(PROPOSAL_KEY(proposal["id"]), proposal)
print(f"[phase] Proposal '{proposal['id']}' advanced to OPEN DRAFTING.")
return True
return False
def _try_advance_drafting(node: DHTNode, proposal: dict, cfg: dict) -> bool:
"""Set draft_candidate if a version reaches draft threshold. Returns True if set."""
if proposal["phase"] != "drafting":
return False
threshold = proposal.get("draft_threshold", cfg.get("draft_threshold", 1))
for h, votes in proposal["draft_votes"].items():
if len(votes) >= threshold and proposal.get("draft_candidate") != h:
proposal["draft_candidate"] = h
proposal["phase"] = "voting"
proposal["voting_votes"] = []
# TTL all losing draft blobs
for other_h in proposal["draft_versions"]:
if other_h != h:
node.dht_put(BLOB_KEY(other_h), node.dht_get(BLOB_KEY(other_h)),
ttl=INTENT_GRACE_SECS)
node.dht_put(PROPOSAL_KEY(proposal["id"]), proposal)
print(f"[phase] Proposal '{proposal['id']}' advanced to OPEN VOTING. Candidate: {h[:16]}...")
return True
return False
def _try_enact(node: DHTNode, proposal: dict, cfg: dict, total_voters: int) -> bool:
"""Enact the proposal if voting threshold (51%) reached. Returns True if enacted."""
if proposal["phase"] != "voting":
return False
majority = (total_voters // 2) + 1
if len(proposal["voting_votes"]) >= majority:
enacted_hash = proposal["draft_candidate"]
proposal["phase"] = "enacted"
proposal["enacted_hash"] = enacted_hash
law_id = proposal.get("amends") or proposal["id"]
law_rec = node.dht_get(LAW_KEY(law_id)) or {
"law_id": law_id, "phase": "enacted",
"enacted_hash": None, "history": [], "enacted_at": None,
}
if law_rec["enacted_hash"]:
law_rec["history"].append(law_rec["enacted_hash"])
law_rec["enacted_hash"] = enacted_hash
law_rec["phase"] = "repealed" if proposal["type"] == "repeal" else "enacted"
law_rec["enacted_at"] = int(time.time())
law_rec["proposal_id"] = proposal["id"]
node.dht_put(LAW_KEY(law_id), law_rec)
node.dht_put(PROPOSAL_KEY(proposal["id"]), proposal)
# Void any other active proposals targeting the same law
_void_competing(node, law_id, proposal["id"], enacted_hash)
print(f"[phase] Proposal '{proposal['id']}' ENACTED. Law '{law_id}' updated.")
return True
return False
def _void_competing(node: DHTNode, law_id: str, enacted_proposal_id: str,
enacted_hash: str):
"""
Mark all active proposals targeting law_id (other than the one just enacted)
as voided. TTL their blobs.
"""
# We don't have a proposal index, so we rely on callers tracking proposal IDs
# via the law record's active_proposals list. For the PoC we store a list of
# active proposal IDs on the law record itself.
law_rec = node.dht_get(LAW_KEY(law_id))
if law_rec is None:
return
for pid in list(law_rec.get("active_proposals", [])):
if pid == enacted_proposal_id:
continue
prop = node.dht_get(PROPOSAL_KEY(pid))
if prop and prop["phase"] in ("intent", "drafting", "voting"):
prop["phase"] = "voided"
prop["voided_by"] = enacted_hash
node.dht_put(PROPOSAL_KEY(pid), prop)
# TTL all blobs for this proposal
for h in [prop.get("body_hash")] + prop.get("draft_versions", []):
if h:
node.dht_put(BLOB_KEY(h), node.dht_get(BLOB_KEY(h)) or "",
ttl=INTENT_GRACE_SECS)
print(f"[void] Proposal '{pid}' voided by enactment of '{enacted_proposal_id}'.")
law_rec["active_proposals"] = [enacted_proposal_id]
node.dht_put(LAW_KEY(law_id), law_rec)
# ===========================================================================
# Client commands
# ===========================================================================
def _load_citizen_verified(citizen_file: str, node: DHTNode) -> tuple[dict, str]:
"""Load citizen JSON and verify against network authority key."""
with open(citizen_file) as f:
citizen = json.load(f)
auth_pub = node.dht_get(AUTH_KEY)
if auth_pub is None:
sys.exit("[auth] Authority public key not found. Run publish-authority first.")
cert_bytes = json.dumps(citizen["cert"], sort_keys=True).encode()
if not _verify(auth_pub, cert_bytes, citizen["cert_sig"]):
sys.exit("[auth] Citizen certificate invalid.")
return citizen, auth_pub
def cmd_propose(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
cfg = _resolve_network_config(node, auth_pub) or {}
# Check law ID not already taken
if node.dht_get(LAW_KEY(args.id)) is not None:
sys.exit(f"[propose] Law ID '{args.id}' already exists. Choose a different ID.")
blob = json.dumps({"content": args.text, "submitted_at": int(time.time())}, sort_keys=True)
bh = _sha256(blob.encode())
now = int(time.time())
window = int(cfg.get("intent_window_days", 7)) * 86400
deadline = now + window
proposal = {
"id": args.id,
"type": "original",
"title": args.title,
"body_hash": bh,
"commentary": "",
"phase": "intent",
"intent_votes": [],
"intent_threshold": int(cfg.get("intent_threshold", 1)),
"intent_deadline": deadline,
"expires_at": deadline + INTENT_GRACE_SECS,
"draft_versions": [],
"draft_votes": {},
"draft_candidate": None,
"draft_threshold": int(cfg.get("draft_threshold", 1)),
"voting_votes": [],
"enacted_hash": None,
"submitted_at": now,
"submitted_by": citizen["citizen_id"],
}
# Store blob with TTL (will be cleared if proposal advances to drafting)
node.dht_put(BLOB_KEY(bh), blob, ttl=window + INTENT_GRACE_SECS)
node.dht_put(PROPOSAL_KEY(args.id), proposal)
# Register on law record
law_rec = node.dht_get(LAW_KEY(args.id)) or {
"law_id": args.id, "phase": "proposed",
"enacted_hash": None, "history": [], "active_proposals": [],
}
if args.id not in law_rec.get("active_proposals", []):
law_rec.setdefault("active_proposals", []).append(args.id)
node.dht_put(LAW_KEY(args.id), law_rec)
print(f"[propose] Proposal '{args.id}' submitted (intent phase).")
print(f" Title : {args.title}")
print(f" Deadline : {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(deadline))}")
print(f" Needs : {proposal['intent_threshold']} intent vote(s) to advance to drafting.")
def cmd_amend(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
cfg = _resolve_network_config(node, auth_pub) or {}
law_rec = node.dht_get(LAW_KEY(args.law_id))
if law_rec is None or law_rec.get("phase") not in ("enacted",):
sys.exit(f"[amend] Law '{args.law_id}' is not currently enacted.")
baseline_hash = law_rec["enacted_hash"]
blob = json.dumps({"content": args.text, "submitted_at": int(time.time()),
"commentary": args.commentary,
"baseline_hash": baseline_hash}, sort_keys=True)
bh = _sha256(blob.encode())
now = int(time.time())
window = int(cfg.get("intent_window_days", 7)) * 86400
deadline = now + window
proposal = {
"id": args.id,
"type": "amendment",
"amends": args.law_id,
"baseline_hash": baseline_hash,
"title": args.title,
"body_hash": bh,
"commentary": args.commentary,
"phase": "intent",
"intent_votes": [],
"intent_threshold": int(cfg.get("intent_threshold", 1)),
"intent_deadline": deadline,
"expires_at": deadline + INTENT_GRACE_SECS,
"draft_versions": [],
"draft_votes": {},
"draft_candidate": None,
"draft_threshold": int(cfg.get("draft_threshold", 1)),
"voting_votes": [],
"enacted_hash": None,
"submitted_at": now,
"submitted_by": citizen["citizen_id"],
}
node.dht_put(BLOB_KEY(bh), blob, ttl=window + INTENT_GRACE_SECS)
node.dht_put(PROPOSAL_KEY(args.id), proposal)
law_rec.setdefault("active_proposals", []).append(args.id)
node.dht_put(LAW_KEY(args.law_id), law_rec)
print(f"[amend] Amendment proposal '{args.id}' submitted for law '{args.law_id}'.")
print(f" Title : {args.title}")
print(f" Baseline : {baseline_hash[:16]}...")
print(f" Deadline : {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(deadline))}")
print(f" Needs : {proposal['intent_threshold']} intent vote(s) to advance.")
def cmd_repeal(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
cfg = _resolve_network_config(node, auth_pub) or {}
law_rec = node.dht_get(LAW_KEY(args.law_id))
if law_rec is None or law_rec.get("phase") not in ("enacted",):
sys.exit(f"[repeal] Law '{args.law_id}' is not currently enacted.")
baseline_hash = law_rec["enacted_hash"]
blob = json.dumps({"content": "REPEALED", "submitted_at": int(time.time()),
"commentary": args.commentary,
"baseline_hash": baseline_hash}, sort_keys=True)
bh = _sha256(blob.encode())
now = int(time.time())
window = int(cfg.get("intent_window_days", 7)) * 86400
deadline = now + window
proposal = {
"id": args.id,
"type": "repeal",
"amends": args.law_id,
"baseline_hash": baseline_hash,
"title": args.title,
"body_hash": bh,
"commentary": args.commentary,
"phase": "intent",
"intent_votes": [],
"intent_threshold": int(cfg.get("intent_threshold", 1)),
"intent_deadline": deadline,
"expires_at": deadline + INTENT_GRACE_SECS,
"draft_versions": [],
"draft_votes": {},
"draft_candidate": None,
"draft_threshold": int(cfg.get("draft_threshold", 1)),
"voting_votes": [],
"enacted_hash": None,
"submitted_at": now,
"submitted_by": citizen["citizen_id"],
}
node.dht_put(BLOB_KEY(bh), blob, ttl=window + INTENT_GRACE_SECS)
node.dht_put(PROPOSAL_KEY(args.id), proposal)
law_rec.setdefault("active_proposals", []).append(args.id)
node.dht_put(LAW_KEY(args.law_id), law_rec)
print(f"[repeal] Repeal proposal '{args.id}' submitted for law '{args.law_id}'.")
print(f" Title : {args.title}")
print(f" Deadline : {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(deadline))}")
def cmd_intent_vote(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
cfg = _resolve_network_config(node, auth_pub) or {}
proposal = node.dht_get(PROPOSAL_KEY(args.id))
if proposal is None:
sys.exit(f"[intent-vote] Proposal '{args.id}' not found.")
if proposal["phase"] != "intent":
sys.exit(f"[intent-vote] Proposal '{args.id}' is not in intent phase (phase={proposal['phase']}).")
if int(time.time()) > proposal["intent_deadline"]:
sys.exit(f"[intent-vote] Intent deadline has passed for '{args.id}'.")
if _citizen_already_voted(proposal["intent_votes"], citizen["citizen_id"]):
sys.exit(f"[intent-vote] You have already cast an intent vote on '{args.id}'.")
vote = _build_vote("intent", args.id, proposal["body_hash"], citizen, auth_pub)
proposal["intent_votes"].append(vote)
cfg_live = _resolve_network_config(node, auth_pub) or cfg
_try_advance_intent(node, proposal, cfg_live)
node.dht_put(PROPOSAL_KEY(args.id), proposal)
n = len(proposal["intent_votes"])
t = proposal["intent_threshold"]
print(f"[intent-vote] '{citizen['citizen_id']}' voted. Intent votes: {n} / {t}")
if proposal["phase"] == "drafting":
print(f" -> Proposal advanced to OPEN DRAFTING.")
def cmd_draft(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
proposal = node.dht_get(PROPOSAL_KEY(args.id))
if proposal is None:
sys.exit(f"[draft] Proposal '{args.id}' not found.")
if proposal["phase"] != "drafting":
sys.exit(f"[draft] Proposal '{args.id}' is not in drafting phase (phase={proposal['phase']}).")
blob = json.dumps({
"content": args.text,
"submitted_at": int(time.time()),
"submitted_by": citizen["citizen_id"],
}, sort_keys=True)
bh = _sha256(blob.encode())
if bh in proposal["draft_versions"]:
sys.exit(f"[draft] Identical draft already exists: {bh[:16]}...")
node.dht_put(BLOB_KEY(bh), blob) # no TTL — draft blobs persist until a candidate is chosen
proposal["draft_versions"].append(bh)
proposal["draft_votes"].setdefault(bh, [])
node.dht_put(PROPOSAL_KEY(args.id), proposal)
print(f"[draft] Draft version submitted for '{args.id}'.")
print(f" Hash : {bh}")
def cmd_nominate(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
cfg = _resolve_network_config(node, auth_pub) or {}
proposal = node.dht_get(PROPOSAL_KEY(args.id))
if proposal is None:
sys.exit(f"[nominate] Proposal '{args.id}' not found.")
if proposal["phase"] != "drafting":
sys.exit(f"[nominate] Proposal '{args.id}' is not in drafting phase.")
if args.hash not in proposal["draft_versions"]:
sys.exit(f"[nominate] Hash {args.hash[:16]}... not found in draft versions.")
# One nomination per citizen across ALL draft versions of this proposal
all_nom_votes = [v for votes in proposal["draft_votes"].values() for v in votes]
if _citizen_already_voted(all_nom_votes, citizen["citizen_id"]):
sys.exit(f"[nominate] You have already nominated a draft for '{args.id}'.")
vote = _build_vote("nominate", args.id, args.hash, citizen, auth_pub)
proposal["draft_votes"][args.hash].append(vote)
_try_advance_drafting(node, proposal, cfg)
node.dht_put(PROPOSAL_KEY(args.id), proposal)
n = len(proposal["draft_votes"][args.hash])
t = proposal.get("draft_threshold", cfg.get("draft_threshold", 1))
print(f"[nominate] '{citizen['citizen_id']}' nominated {args.hash[:16]}... Nominations: {n} / {t}")
if proposal["phase"] == "voting":
print(f" -> Proposal advanced to OPEN VOTING.")
def cmd_vote(node: DHTNode, args):
citizen, auth_pub = _load_citizen_verified(args.citizen, node)
cfg = _resolve_network_config(node, auth_pub) or {}
proposal = node.dht_get(PROPOSAL_KEY(args.id))
if proposal is None:
sys.exit(f"[vote] Proposal '{args.id}' not found.")
if proposal["phase"] != "voting":
sys.exit(f"[vote] Proposal '{args.id}' is not in open-voting phase (phase={proposal['phase']}).")
if _citizen_already_voted(proposal["voting_votes"], citizen["citizen_id"]):
sys.exit(f"[vote] You have already voted on '{args.id}'.")
vote = _build_vote("vote", args.id, proposal["draft_candidate"], citizen, auth_pub)
proposal["voting_votes"].append(vote)
total_voters = (cfg or {}).get("total_voters")
if total_voters:
_try_enact(node, proposal, cfg, int(total_voters))
node.dht_put(PROPOSAL_KEY(args.id), proposal)
n = len(proposal["voting_votes"])
majority = ((int(total_voters) // 2) + 1) if total_voters else "?"
print(f"[vote] '{citizen['citizen_id']}' voted on '{args.id}'. Votes: {n} / {majority}")
if proposal["phase"] == "enacted":
print(f" -> LAW ENACTED.")
def cmd_read(node: DHTNode, args):
law_rec = node.dht_get(LAW_KEY(args.id))
if law_rec is None:
sys.exit(f"[read] Law '{args.id}' not found. It may still be in proposal phase.")
phase = law_rec.get("phase", "unknown")
if phase not in ("enacted", "repealed"):
print(f"[read] Law '{args.id}' is not yet enacted (phase: {phase}).")
return
enacted_hash = law_rec["enacted_hash"]
blob_raw = node.dht_get(BLOB_KEY(enacted_hash))
if blob_raw is None:
sys.exit(f"[read] Enacted blob not found on network (hash: {enacted_hash[:16]}...).")
blob = json.loads(blob_raw)
auth_pub = node.dht_get(AUTH_KEY)
cfg = _resolve_network_config(node, auth_pub) or {}
total = cfg.get("total_voters")
proposal = node.dht_get(PROPOSAL_KEY(law_rec.get("proposal_id", ""))) or {}
n_votes = len(proposal.get("voting_votes", []))
majority = ((int(total) // 2) + 1) if total else "?"
print(f"\n{'='*64}")
print(f" Law : {args.id} [{phase.upper()}]")
print(f" Hash : {enacted_hash}")
print(f" Enacted : {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(law_rec['enacted_at']))}")
print(f" Versions : {len(law_rec.get('history', [])) + 1} (use `client history` for full chain)")
print(f" Votes : {n_votes} / {majority}")
print(f"{'='*64}")
print(f"\n{blob['content']}\n")
def cmd_status(node: DHTNode, args):
proposal = node.dht_get(PROPOSAL_KEY(args.id))
law_rec = node.dht_get(LAW_KEY(args.id))
auth_pub = node.dht_get(AUTH_KEY)
cfg = _resolve_network_config(node, auth_pub) or {}
if proposal is None and law_rec is None:
sys.exit(f"[status] No proposal or law found with ID '{args.id}'.")
print(f"\n{'='*64}")
print(f" Status : {args.id}")
print(f"{'='*64}")
if proposal:
phase = proposal["phase"]
print(f" Type : {proposal['type']}")
print(f" Phase : {phase.upper()}")
print(f" Title : {proposal['title']}")
print(f" Submitted : {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(proposal['submitted_at']))} by {proposal['submitted_by']}")
if proposal.get("amends"):
print(f" Amends : {proposal['amends']} (baseline: {proposal.get('baseline_hash','?')[:16]}...)")
if phase == "intent":
n = len(proposal["intent_votes"])
t = proposal["intent_threshold"]
ddl = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(proposal["intent_deadline"]))
print(f" Intent : {n} / {t} votes (deadline: {ddl})")
elif phase == "drafting":
print(f" Drafts : {len(proposal['draft_versions'])} version(s)")
t = proposal.get("draft_threshold", cfg.get("draft_threshold", 1))
for h in proposal["draft_versions"]:
n = len(proposal["draft_votes"].get(h, []))
tag = " <- CANDIDATE" if h == proposal.get("draft_candidate") else ""
print(f" {h[:32]}... nominations: {n} / {t}{tag}")
elif phase == "voting":
n = len(proposal["voting_votes"])
total = cfg.get("total_voters")
majority = ((int(total) // 2) + 1) if total else "?"
print(f" Candidate : {proposal['draft_candidate'][:32]}...")
print(f" Votes : {n} / {majority}")
elif phase == "enacted":
print(f" Enacted : {proposal['enacted_hash'][:32]}...")
elif phase == "voided":
print(f" Voided by : {proposal.get('voided_by','?')[:32]}...")
if law_rec and law_rec.get("phase") in ("enacted", "repealed"):
print(f"\n Current enacted hash : {law_rec['enacted_hash'][:32]}...")
print(f" Law phase : {law_rec['phase'].upper()}")
print(f" Prior versions : {len(law_rec.get('history', []))}")
print()
def cmd_proposals(node: DHTNode, args):
"""List all active (non-expired) proposals visible from this node."""
auth_pub = node.dht_get(AUTH_KEY)
cfg = _resolve_network_config(node, auth_pub) or {}
now = int(time.time())
active = []
with node.lock:
keys = list(node.store.keys())
for key in keys:
if not key.startswith("proposal:"):
continue
rec = node.store[key]
if isinstance(rec, dict) and rec.get("phase") not in ("enacted", "voided", "abandoned"):
if rec.get("phase") == "intent" and now > rec.get("expires_at", 0):
continue # expired intent, skip
active.append(rec)
if not active:
print("[proposals] No active proposals found on this node.")
return
print(f"\n{'='*64}")
print(f" Active Proposals ({len(active)})")
print(f"{'='*64}")
for p in sorted(active, key=lambda x: x["submitted_at"]):
amends = f" [amends {p['amends']}]" if p.get("amends") else ""
print(f" {p['id']:24s} [{p['phase']:10s}] {p['title'][:36]}{amends}")
print()
def cmd_drafts(node: DHTNode, args):
proposal = node.dht_get(PROPOSAL_KEY(args.id))
if proposal is None:
# Also check if it's a law ID and look up active proposals
law_rec = node.dht_get(LAW_KEY(args.id))
if law_rec:
print(f"[drafts] '{args.id}' is a law ID. Showing active amendment proposals:")
for pid in law_rec.get("active_proposals", []):
p = node.dht_get(PROPOSAL_KEY(pid))
if p and p["phase"] == "drafting":
print(f" Proposal: {pid} ({len(p['draft_versions'])} draft(s))")
else:
sys.exit(f"[drafts] No proposal or law found with ID '{args.id}'.")
return
if not proposal.get("draft_versions"):
print(f"[drafts] No draft versions for '{args.id}' yet.")
return
print(f"\n{'='*64}")
print(f" Draft versions for : {args.id} (phase: {proposal['phase']})")
print(f"{'='*64}")
t = proposal.get("draft_threshold", 1)
for h in proposal["draft_versions"]:
blob_raw = node.dht_get(BLOB_KEY(h))
n_nom = len(proposal["draft_votes"].get(h, []))
tag = " <- CANDIDATE" if h == proposal.get("draft_candidate") else ""
ts = by = content = "?"
if blob_raw:
blob = json.loads(blob_raw)
ts = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(blob["submitted_at"]))
by = blob.get("submitted_by", "?")
content = blob["content"][:72] + ("..." if len(blob["content"]) > 72 else "")
print(f"\n {h}{tag}")
print(f" Time : {ts} by {by}")
print(f" Nominations : {n_nom} / {t}")
print(f" Preview : {content}")
print()
def cmd_history(node: DHTNode, args):
law_rec = node.dht_get(LAW_KEY(args.id))
if law_rec is None:
sys.exit(f"[history] No law record found for '{args.id}'.")
all_hashes = law_rec.get("history", []) + ([law_rec["enacted_hash"]] if law_rec["enacted_hash"] else [])
print(f"\n{'='*64}")
print(f" Enactment history : {args.id} ({len(all_hashes)} version(s))")
print(f"{'='*64}")
for i, h in enumerate(all_hashes):
is_current = h == law_rec["enacted_hash"]
blob_raw = node.dht_get(BLOB_KEY(h))
tag = " <- CURRENT" if is_current else ""
ts = content = "?"
if blob_raw:
blob = json.loads(blob_raw)
ts = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(blob.get("submitted_at", 0)))
content = blob["content"][:72] + ("..." if len(blob["content"]) > 72 else "")
print(f"\n [{i+1}] {h}{tag}")
print(f" Enacted : {ts}")
print(f" Preview : {content}")
print()
# ===========================================================================
# Authority commands
# ===========================================================================
def cmd_authority_genkey(args):
priv = Ed25519PrivateKey.generate()
priv_pem = _priv_to_pem(priv)
pub_pem = _pub_to_pem(priv)
payload = {"private_pem": priv_pem, "public_pem": pub_pem, "role": "authority"}
with open(args.out, "w") as f:
json.dump(payload, f, indent=2)
print(f"[authority] Keypair saved to {args.out}")
print(f"[authority] Fingerprint: {_sha256(pub_pem.encode())[:16]}")
def cmd_authority_issue(args):
with open(args.authority) as f:
auth = json.load(f)
priv = Ed25519PrivateKey.generate()
priv_pem = _priv_to_pem(priv)
pub_pem = _pub_to_pem(priv)
cert = {
"citizen_id": args.id,
"pubkey_pem": pub_pem,
"issued_at": int(time.time()),
}
cert_bytes = json.dumps(cert, sort_keys=True).encode()
cert_sig = _sign(auth["private_pem"], cert_bytes)
citizen = {
"citizen_id": args.id,
"private_pem": priv_pem,
"public_pem": pub_pem,
"cert": cert,
"cert_sig": cert_sig,
"authority_pub_pem": auth["public_pem"],
}
with open(args.out, "w") as f:
json.dump(citizen, f, indent=2)
new_count = _countfile_increment(args.countfile)
print(f"[authority] Citizen '{args.id}' issued -> {args.out}")
print(f"[authority] Voter count in '{args.countfile}': {new_count}")
def cmd_authority_publish_voter_count(args):
"""
Long-running TCP server. Reads COUNTFILE and threshold flags on every
connection and responds with a freshly signed config payload.
"""
with open(args.authority) as f:
auth = json.load(f)
def _make_payload() -> dict:
count = _countfile_read(args.countfile)
body = {
"total_voters": count,
"intent_threshold": args.intent_threshold,
"intent_window_days": args.intent_window_days,
"draft_threshold": args.draft_threshold,
"timestamp": int(time.time()),
}
body["signature"] = _sign(auth["private_pem"], json.dumps(body, sort_keys=True).encode())
return body
def _handle(conn):
try:
conn.settimeout(TIMEOUT)
try:
conn.recv(256)
except Exception:
pass
conn.sendall((json.dumps(_make_payload()) + "\n").encode())
except Exception:
pass
finally:
conn.close()
srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv_sock.bind(("0.0.0.0", args.port))
srv_sock.listen(32)
srv_sock.settimeout(1.0)
initial = _countfile_read(args.countfile)
majority = (initial // 2) + 1
print(f"[authority] Voter-count server on port {args.port}")
print(f"[authority] Countfile : {args.countfile} (current: {initial})")
print(f"[authority] 51% majority needs : {majority} votes")
print(f"[authority] intent_threshold : {args.intent_threshold}")
print(f"[authority] intent_window_days : {args.intent_window_days}")
print(f"[authority] draft_threshold : {args.draft_threshold}")
print(f"[authority] Press Ctrl+C to stop.")
try:
while True:
try:
conn, _ = srv_sock.accept()
threading.Thread(target=_handle, args=(conn,), daemon=True).start()
except socket.timeout:
continue
except KeyboardInterrupt:
print("\n[authority] Voter-count server stopped.")
finally:
srv_sock.close()
def cmd_publish_authority(node: DHTNode, args):
with open(args.authority) as f:
auth = json.load(f)
node.dht_put(AUTH_KEY, auth["public_pem"])
fp = _sha256(auth["public_pem"].encode())[:16]
print(f"[client] Authority public key published. Fingerprint: {fp}")
if args.vc_server:
host, port = args.vc_server.split(":")
node.dht_put(VOTER_COUNT_KEY, {"host": host, "port": int(port)})
print(f"[client] Voter-count server address published: {args.vc_server}")
# ===========================================================================
# Node runner
# ===========================================================================
def run_node(args):
node = DHTNode(port=args.port, bootstrap=args.bootstrap or None)
node.start()
print(f"[node:{args.port}] Running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
node._running = False
print(f"\n[node:{args.port}] Stopped.")
# ===========================================================================
# Ephemeral client node
# ===========================================================================
def _ephemeral_node(node_addr: str) -> DHTNode:
host, port = node_addr.split(":")
with socket.socket() as s:
s.bind(("", 0))
free_port = s.getsockname()[1]
node = DHTNode(port=free_port, bootstrap=f"{host}:{port}")
node.start()
time.sleep(0.3)
return node
def run_client(args):
node = _ephemeral_node(args.node)
subcmd = args.subcmd
dispatch = {
"publish-authority": lambda: cmd_publish_authority(node, args),
"propose": lambda: cmd_propose(node, args),
"amend": lambda: cmd_amend(node, args),
"repeal": lambda: cmd_repeal(node, args),
"intent-vote": lambda: cmd_intent_vote(node, args),
"draft": lambda: cmd_draft(node, args),
"nominate": lambda: cmd_nominate(node, args),
"vote": lambda: cmd_vote(node, args),
"read": lambda: cmd_read(node, args),
"status": lambda: cmd_status(node, args),
"proposals": lambda: cmd_proposals(node, args),
"drafts": lambda: cmd_drafts(node, args),
"history": lambda: cmd_history(node, args),
}
fn = dispatch.get(subcmd)
if fn:
fn()
else:
sys.exit(f"Unknown client subcommand: {subcmd}")
node._running = False
# ===========================================================================
# CLI
# ===========================================================================
def main():
parser = argparse.ArgumentParser(
prog="law",
description="Decentralised legislative system over a DHT with signed citizen votes.",
)
sub = parser.add_subparsers(dest="cmd", required=True)
# -- authority --
auth_p = sub.add_parser("authority", help="Authority key and server management")
auth_sub = auth_p.add_subparsers(dest="subcmd", required=True)
gk = auth_sub.add_parser("genkey", help="Generate authority keypair")
gk.add_argument("--out", required=True)
iss = auth_sub.add_parser("issue", help="Issue a citizen identity key (increments countfile)")
iss.add_argument("--authority", required=True)
iss.add_argument("--id", required=True, help="Citizen identifier")
iss.add_argument("--out", required=True, help="Output JSON file")
iss.add_argument("--countfile", required=True, help="Voter count file (auto-incremented)")
pvc = auth_sub.add_parser("publish-voter-count",
help="Run the long-lived authority config server")
pvc.add_argument("--authority", required=True)
pvc.add_argument("--countfile", required=True)
pvc.add_argument("--port", required=True, type=int)
pvc.add_argument("--intent-threshold", required=True, type=int,
help="Votes needed to move intent -> drafting")
pvc.add_argument("--intent-window-days", required=True, type=int,
help="Days an intent proposal has to reach threshold before expiring")
pvc.add_argument("--draft-threshold", required=True, type=int,
help="Nominations needed to move a draft -> open voting")
# -- node --
node_p = sub.add_parser("node", help="Run a DHT node")
node_p.add_argument("--port", type=int, required=True)
node_p.add_argument("--bootstrap", default=None, help="Bootstrap peer host:port")
# -- client --
client_p = sub.add_parser("client", help="Interact with the network")
client_p.add_argument("--node", required=True, help="DHT node host:port")
client_sub = client_p.add_subparsers(dest="subcmd", required=True)
pa = client_sub.add_parser("publish-authority", help="Publish authority pubkey to DHT")
pa.add_argument("--authority", required=True)
pa.add_argument("--vc-server", default=None, dest="vc_server",
help="Voter-count server host:port to advertise")
pp = client_sub.add_parser("propose", help="Submit a new law intent proposal")
pp.add_argument("--id", required=True, help="Unique proposal/law ID e.g. law-42")
pp.add_argument("--title", required=True)
pp.add_argument("--text", required=True, help="Initial body text")
pp.add_argument("--citizen", required=True)
am = client_sub.add_parser("amend", help="Submit an amendment proposal for an enacted law")
am.add_argument("--id", required=True, help="Unique proposal ID e.g. law-42-amend-1")
am.add_argument("--law-id", required=True, dest="law_id", help="Law being amended")
am.add_argument("--title", required=True)
am.add_argument("--text", required=True, help="Full revised text of the law")
am.add_argument("--commentary", required=True, help="Explanation of changes")
am.add_argument("--citizen", required=True)
rp = client_sub.add_parser("repeal", help="Submit a repeal proposal for an enacted law")
rp.add_argument("--id", required=True, help="Unique proposal ID e.g. law-42-repeal")
rp.add_argument("--law-id", required=True, dest="law_id")
rp.add_argument("--title", required=True)
rp.add_argument("--commentary", required=True)
rp.add_argument("--citizen", required=True)
iv = client_sub.add_parser("intent-vote", help="Vote to advance a proposal from intent to drafting")
iv.add_argument("--id", required=True)
iv.add_argument("--citizen", required=True)
dr = client_sub.add_parser("draft", help="Submit a draft version of a proposal")
dr.add_argument("--id", required=True)
dr.add_argument("--text", required=True)
dr.add_argument("--citizen", required=True)
nm = client_sub.add_parser("nominate", help="Nominate a draft version as the voting candidate")
nm.add_argument("--id", required=True)
nm.add_argument("--hash", required=True, help="Draft version hash to nominate")
nm.add_argument("--citizen", required=True)
vo = client_sub.add_parser("vote", help="Vote for the candidate in open-voting phase")
vo.add_argument("--id", required=True)
vo.add_argument("--citizen", required=True)
rd = client_sub.add_parser("read", help="Read the current enacted text of a law")
rd.add_argument("--id", required=True)
st = client_sub.add_parser("status", help="Show phase, thresholds, and vote tallies")
st.add_argument("--id", required=True)
ls = client_sub.add_parser("proposals", help="List all active proposals on this node")
dv = client_sub.add_parser("drafts", help="Show all draft versions for a proposal")
dv.add_argument("--id", required=True)
hi = client_sub.add_parser("history", help="Show full enactment history of a law")
hi.add_argument("--id", required=True)
args = parser.parse_args()
if args.cmd == "authority":
if args.subcmd == "genkey":
cmd_authority_genkey(args)
elif args.subcmd == "issue":
cmd_authority_issue(args)
elif args.subcmd == "publish-voter-count":
cmd_authority_publish_voter_count(args)
elif args.cmd == "node":
run_node(args)
elif args.cmd == "client":
run_client(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment