Skip to content

Instantly share code, notes, and snippets.

@yrong
Last active June 3, 2026 04:00
Show Gist options
  • Select an option

  • Save yrong/3b6b0406dc759bc2d5d4bbbc4f819480 to your computer and use it in GitHub Desktop.

Select an option

Save yrong/3b6b0406dc759bc2d5d4bbbc4f819480 to your computer and use it in GitHub Desktop.
verify_beefy_leaf_order
#!/usr/bin/env python3
"""
verify_beefy_leaf_order.py
Independently verifies, against live Polkadot mainnet state, that the BEEFY
validator-set Merkle tree (the `keyset_commitment` consumed by the Snowbridge
Ethereum light client) is built with its leaves in **stash-AccountId-sorted
order** — i.e. a validator's Merkle leaf index equals its rank in the
ascending-sorted set of elected stash AccountIds.
It proves this the only way that leaves no room for inference: by recomputing
the on-chain `keyset_commitment` byte-for-byte from public state.
Chain of facts established:
1. Session.Validators (the active set) is fully ascending-sorted by AccountId.
2. Beefy.Authorities is index-aligned to Session.Validators (same count;
ordered by that external index, NOT by key bytes).
3. merkle_root( keccak256(eth_addr(key_i)) for i in Beefy.Authorities order )
== on-chain BeefyMmrLeaf.BeefyAuthorities.keyset_commitment.
All primitives (Keccak-256, secp256k1 decompression, twox128, the binary
Merkle builder) are pure-Python and self-tested against published / in-repo
test vectors before any network data is touched. No third-party crypto deps.
Edge case reproduced faithfully: BEEFY keys that are not valid secp256k1
points (e.g. placeholder "beefy..." keys present on mainnet) fail
`k256::PublicKey::from_sec1_bytes`; substrate's `BeefyEcdsaToEthereum::convert`
then returns `Vec::default()` (the EMPTY vector), so such a leaf is
keccak256(b"") — NOT keccak256([0u8; 20]).
Usage:
python3 verify_beefy_leaf_order.py [--rpc URL] [--at BLOCK_HASH]
Default RPC: https://rpc.polkadot.io
Exit code 0 on full success; non-zero if any assertion fails.
"""
import argparse
import json
import struct
import subprocess
import sys
# ---------------------------------------------------------------------------
# Keccak-256 (Ethereum variant: domain suffix 0x01, rate 136 bytes)
# ---------------------------------------------------------------------------
_KECCAK_RC = [
0x0000000000000001, 0x0000000000008082, 0x800000000000808A, 0x8000000080008000,
0x000000000000808B, 0x0000000080000001, 0x8000000080008081, 0x8000000000008009,
0x000000000000008A, 0x0000000000000088, 0x0000000080008009, 0x000000008000000A,
0x000000008000808B, 0x800000000000008B, 0x8000000000008089, 0x8000000000008003,
0x8000000000008002, 0x8000000000000080, 0x000000000000800A, 0x800000008000000A,
0x8000000080008081, 0x8000000000008080, 0x0000000080000001, 0x8000000080008008,
]
_KECCAK_ROT = [
[0, 36, 3, 41, 18], [1, 44, 10, 45, 2], [62, 6, 43, 15, 61],
[28, 55, 25, 21, 56], [27, 20, 39, 8, 14],
]
_M64 = (1 << 64) - 1
def keccak256(data: bytes) -> bytes:
def rol(x, n):
return ((x << n) | (x >> (64 - n))) & _M64
A = [[0] * 5 for _ in range(5)]
rate = 136
d = bytearray(data)
d.append(0x01)
while len(d) % rate != 0:
d.append(0x00)
d[-1] |= 0x80
for off in range(0, len(d), rate):
block = d[off:off + rate]
for i in range(rate // 8):
A[i % 5][i // 5] ^= int.from_bytes(block[i * 8:i * 8 + 8], "little")
for rnd in range(24):
C = [A[x][0] ^ A[x][1] ^ A[x][2] ^ A[x][3] ^ A[x][4] for x in range(5)]
D = [C[(x - 1) % 5] ^ rol(C[(x + 1) % 5], 1) for x in range(5)]
for x in range(5):
for y in range(5):
A[x][y] ^= D[x]
B = [[0] * 5 for _ in range(5)]
for x in range(5):
for y in range(5):
B[y][(2 * x + 3 * y) % 5] = rol(A[x][y], _KECCAK_ROT[x][y])
for x in range(5):
for y in range(5):
A[x][y] = B[x][y] ^ ((~B[(x + 1) % 5][y]) & B[(x + 2) % 5][y]) & _M64
A[0][0] ^= _KECCAK_RC[rnd]
out = bytearray()
for i in range(4):
out += A[i % 5][i // 5].to_bytes(8, "little")
return bytes(out[:32])
# ---------------------------------------------------------------------------
# secp256k1 point decompression + Ethereum address derivation
# Mirrors substrate's ECDSAExt::to_eth_address: keccak256(x || y)[12:].
# A 33-byte compressed key that k256 would reject yields the EMPTY leaf b"".
# ---------------------------------------------------------------------------
_SECP_P = (1 << 256) - (1 << 32) - 977
def beefy_leaf_preimage(compressed_key: bytes) -> bytes:
"""Return the 20-byte eth address, or b'' if the key is not a valid point
(replicating `to_eth_address() -> Err` -> `.unwrap_or_default()` == [])."""
if len(compressed_key) != 33 or compressed_key[0] not in (2, 3):
return b""
x = int.from_bytes(compressed_key[1:33], "big")
if x >= _SECP_P:
return b""
y2 = (pow(x, 3, _SECP_P) + 7) % _SECP_P
y = pow(y2, (_SECP_P + 1) // 4, _SECP_P)
if (y * y - y2) % _SECP_P != 0:
return b"" # x not on curve -> rejected by k256
if (y & 1) != (compressed_key[0] & 1):
y = _SECP_P - y
return keccak256(x.to_bytes(32, "big") + y.to_bytes(32, "big"))[12:]
# ---------------------------------------------------------------------------
# twox128 (XXH64 with seeds 0 and 1, little-endian) -- for storage keys
# ---------------------------------------------------------------------------
_XP1, _XP2, _XP3, _XP4, _XP5 = (
11400714785074694791, 14029467366897019727, 1609587929392839161,
9650029242287828579, 2870177450012600261,
)
def _xxh64(data: bytes, seed: int = 0) -> int:
def rol(x, r):
return ((x << r) | (x >> (64 - r))) & _M64
n = len(data)
i = 0
if n >= 32:
v1 = (seed + _XP1 + _XP2) & _M64
v2 = (seed + _XP2) & _M64
v3 = seed & _M64
v4 = (seed - _XP1) & _M64
def rd(o):
return struct.unpack_from("<Q", data, o)[0]
while i + 32 <= n:
v1 = (rol((v1 + rd(i) * _XP2) & _M64, 31) * _XP1) & _M64
v2 = (rol((v2 + rd(i + 8) * _XP2) & _M64, 31) * _XP1) & _M64
v3 = (rol((v3 + rd(i + 16) * _XP2) & _M64, 31) * _XP1) & _M64
v4 = (rol((v4 + rd(i + 24) * _XP2) & _M64, 31) * _XP1) & _M64
i += 32
h = (rol(v1, 1) + rol(v2, 7) + rol(v3, 12) + rol(v4, 18)) & _M64
for v in (v1, v2, v3, v4):
v = (rol((v * _XP2) & _M64, 31) * _XP1) & _M64
h = ((h ^ v) * _XP1 + _XP4) & _M64
else:
h = (seed + _XP5) & _M64
h = (h + n) & _M64
while i + 8 <= n:
k = struct.unpack_from("<Q", data, i)[0]
k = (rol((k * _XP2) & _M64, 31) * _XP1) & _M64
h = (rol(h ^ k, 27) * _XP1 + _XP4) & _M64
i += 8
if i + 4 <= n:
k = struct.unpack_from("<I", data, i)[0]
h = (rol((h ^ (k * _XP1)) & _M64, 23) * _XP2 + _XP3) & _M64
i += 4
while i < n:
h = (rol((h ^ (data[i] * _XP5)) & _M64, 11) * _XP1) & _M64
i += 1
h ^= h >> 33
h = (h * _XP2) & _M64
h ^= h >> 29
h = (h * _XP3) & _M64
h ^= h >> 32
return h & _M64
def twox128(s: str) -> bytes:
b = s.encode()
return struct.pack("<Q", _xxh64(b, 0)) + struct.pack("<Q", _xxh64(b, 1))
def storage_key(pallet: str, item: str) -> str:
return "0x" + (twox128(pallet) + twox128(item)).hex()
# ---------------------------------------------------------------------------
# Binary Merkle tree (substrate utils/binary-merkle-tree semantics):
# - each leaf value is hashed once: keccak256(leaf_bytes)
# - pairs combined in order: keccak256(left || right)
# - on an odd row, the trailing node is PROMOTED unchanged to the next level
# - NOT sorted-pair; order is significant
# ---------------------------------------------------------------------------
def merkle_root(leaf_hashes):
level = list(leaf_hashes)
if not level:
return bytes(32)
while len(level) > 1:
nxt = []
i = 0
while i + 1 < len(level):
nxt.append(keccak256(level[i] + level[i + 1]))
i += 2
if i < len(level): # odd trailing node -> promote
nxt.append(level[i])
level = nxt
return level[0]
def merkle_root_of_values(values):
return merkle_root([keccak256(v) for v in values])
# ---------------------------------------------------------------------------
# Self-tests: prove every primitive against published / in-repo vectors
# BEFORE trusting any of it against live chain state.
# ---------------------------------------------------------------------------
def self_test():
# Keccak-256 known vectors
assert keccak256(b"").hex() == \
"c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"
assert keccak256(b"abc").hex() == \
"4e03657aea45a94fc7d47ba826c8d667c0d1e6e33a64a036ec44f58fa12d6c45"
# secp256k1 + eth address: generator (privkey=1) and 2G (privkey=2)
gx = bytes.fromhex("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798")
assert beefy_leaf_preimage(b"\x02" + gx).hex() == \
"7e5f4552091a69125d5dfcb7b8c2659029395bdf"
gxi = int.from_bytes(gx, "big")
gyi = int.from_bytes(
bytes.fromhex("483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8"),
"big")
lam = (3 * gxi * gxi) * pow(2 * gyi, -1, _SECP_P) % _SECP_P
x3 = (lam * lam - 2 * gxi) % _SECP_P
y3 = (lam * (gxi - x3) - gyi) % _SECP_P
comp2g = bytes([2 if y3 % 2 == 0 else 3]) + x3.to_bytes(32, "big")
assert beefy_leaf_preimage(comp2g).hex() == \
"2b5ad5c4795c026514f8317c7a215e218dccd6cf"
# twox128 against the well-known Session.Validators key
assert storage_key("Session", "Validators") == \
"0xcec5070d609dd3497f72bde07fc96ba088dcde934c658227ee1dfafcd6e16903"
# Merkle builder against substrate utils/binary-merkle-tree test vectors
mk = lambda vs: merkle_root_of_values(vs).hex()
assert mk([bytes.fromhex("E04CC55ebEE1cBCE552f250e85c57B70B2E2625b")]) == \
"aeb47a269393297f4b0a3c9c9cfd00c7a4195255274cf39d83dabc2fcc9ff3d7"
assert mk([bytes.fromhex("E04CC55ebEE1cBCE552f250e85c57B70B2E2625b"),
bytes.fromhex("25451A4de12dcCc2D166922fA938E900fCc4ED24")]) == \
"697ea2a8fe5b03468548a7a413424a6292ab44a82a6f5cc594c3fa7dda7ce402"
assert mk([b"a", b"b", b"c"]) == \
"aff1208e69c9e8be9b584b07ebac4e48a1ee9d15ce3afe20b77a4d29e4175aa3"
assert mk([b"a", b"b", b"a"]) == \
"b8912f7269068901f231a965adfefbc10f0eedcfa61852b103efd54dac7db3d7"
assert mk([b"a", b"b", b"a", b"b"]) == \
"dc8e73fe6903148ff5079baecc043983625c23b39f31537e322cd0deee09fa9c"
assert mk([bytes([c]) for c in b"abcdefghij"]) == \
"fb3b3be94be9e983ba5e094c9c51a7d96a4fa2e5d8e891df00ca89ba05bb1239"
print("[ok] self-tests passed: keccak256, secp256k1->eth, twox128, merkle builder")
# ---------------------------------------------------------------------------
# Minimal JSON-RPC over curl (public RPCs reject bare urllib clients)
# ---------------------------------------------------------------------------
class Rpc:
def __init__(self, url):
self.url = url
def call(self, method, params):
payload = json.dumps({"id": 1, "jsonrpc": "2.0", "method": method, "params": params})
out = subprocess.run(
["curl", "-s", "--max-time", "30", "-H", "Content-Type: application/json",
"-d", payload, self.url],
capture_output=True, text=True)
if out.returncode != 0:
raise RuntimeError(f"curl failed: {out.stderr.strip()}")
resp = json.loads(out.stdout)
if "error" in resp:
raise RuntimeError(f"RPC error: {resp['error']}")
return resp["result"]
def storage(self, key_hex, at):
return self.call("state_getStorage", [key_hex, at])
def decode_compact_len(buf):
"""Return (count, offset_after_prefix) for a SCALE compact at buf[0:]."""
first = buf[0]
mode = first & 3
if mode == 0:
return first >> 2, 1
if mode == 1:
return int.from_bytes(buf[0:2], "little") >> 2, 2
if mode == 2:
return int.from_bytes(buf[0:4], "little") >> 2, 4
ln = (first >> 2) + 4
return int.from_bytes(buf[1:1 + ln], "little"), 1 + ln
def parse_vec_fixed(hex_str, elem_size):
"""Decode SCALE Vec<[u8; elem_size]>."""
b = bytes.fromhex(hex_str[2:])
count, off = decode_compact_len(b)
body = b[off:]
assert len(body) == count * elem_size, \
f"length mismatch: {len(body)} != {count}*{elem_size}"
return [body[i * elem_size:(i + 1) * elem_size] for i in range(count)]
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--rpc", default="https://rpc.polkadot.io")
ap.add_argument("--at", default=None,
help="block hash to query (default: current finalized head)")
args = ap.parse_args()
self_test()
rpc = Rpc(args.rpc)
at = args.at or rpc.call("chain_getFinalizedHead", [])
header = rpc.call("chain_getHeader", [at])
print(f"[info] querying {args.rpc} at block #{int(header['number'], 16)} ({at})")
# --- 1. Session.Validators must be ascending-sorted by AccountId ---
validators = parse_vec_fixed(rpc.storage(storage_key("Session", "Validators"), at), 32)
is_sorted = validators == sorted(validators)
print(f"[1] Session.Validators: {len(validators)} accounts, "
f"AccountId-ascending-sorted = {is_sorted}")
assert is_sorted, "active set is NOT AccountId-sorted (truncation/stake-ranked regime?)"
# --- 2. Beefy.Authorities: same size, index-aligned, not key-sorted ---
set_id = int.from_bytes(
bytes.fromhex(rpc.storage(storage_key("Beefy", "ValidatorSetId"), at)[2:])[:8], "little")
authorities = parse_vec_fixed(rpc.storage(storage_key("Beefy", "Authorities"), at), 33)
key_sorted = authorities == sorted(authorities)
print(f"[2] Beefy.Authorities: {len(authorities)} keys (validator set id {set_id}), "
f"sorted-by-key-bytes = {key_sorted} (expected False: order is external)")
assert len(authorities) == len(validators)
assert not key_sorted
# --- 3. Reconstruct keyset_commitment and compare to on-chain value ---
commit_raw = bytes.fromhex(
rpc.storage(storage_key("BeefyMmrLeaf", "BeefyAuthorities"), at)[2:])
# BeefyAuthoritySet { id: u64, len: u32, keyset_commitment: H256 }
c_id = int.from_bytes(commit_raw[0:8], "little")
c_len = int.from_bytes(commit_raw[8:12], "little")
on_chain_root = commit_raw[12:44]
invalid = [i for i, k in enumerate(authorities) if beefy_leaf_preimage(k) == b""]
recon = merkle_root_of_values([beefy_leaf_preimage(k) for k in authorities])
print(f"[3] BeefyMmrLeaf.BeefyAuthorities: id={c_id} len={c_len}")
if invalid:
print(f" note: {len(invalid)} invalid/placeholder key(s) at indices {invalid} "
f"-> leaf = keccak256(b'')")
print(f" reconstructed keyset_commitment: 0x{recon.hex()}")
print(f" on-chain keyset_commitment: 0x{on_chain_root.hex()}")
if recon == on_chain_root:
print("\n[PASS] Reconstruction matches on-chain commitment.\n"
" => BEEFY Merkle leaf index == rank in AccountId-sorted validator set.")
return 0
print("\n[FAIL] Reconstruction does NOT match on-chain commitment.")
return 1
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment