Skip to content

Instantly share code, notes, and snippets.

@Lohann
Last active October 18, 2024 21:36
Show Gist options
  • Save Lohann/c2869d23c757f45bb0a33545046b2340 to your computer and use it in GitHub Desktop.
Save Lohann/c2869d23c757f45bb0a33545046b2340 to your computer and use it in GitHub Desktop.
Threshold Signatures implementation based on FROST using schorr signatures over the RSA group
import hashlib
from random import SystemRandom
from collections import namedtuple
'''
FROST: Flexible Round-Optimized Schnorr Threshold Signatures
Implementation based in this paper:
https://eprint.iacr.org/2020/852.pdf
'''
def extended_euclidean_algorithm(a, b):
"""
Returns a three-tuple (gcd, x, y) such that
a * x + b * y == gcd, where gcd is the greatest
common divisor of a and b.
This function implements the extended Euclidean
algorithm and runs in O(log b) in the worst case.
"""
s, old_s = 0, 1
t, old_t = 1, 0
r, old_r = b, a
while r != 0:
quotient = old_r // r
old_r, r = r, old_r - quotient * r
old_s, s = s, old_s - quotient * s
old_t, t = t, old_t - quotient * t
return old_r, old_s, old_t
def inverse_of(n, p):
"""
Returns the multiplicative inverse of
n modulo p.
This function returns an integer m such that
(n * m) % p == 1.
"""
gcd, x, y = extended_euclidean_algorithm(n, p)
assert (n * x + p * y) % p == gcd
if gcd != 1:
# Either n is 0, or p is not a prime number.
raise ValueError(
'{} has no multiplicative inverse '
'modulo {}'.format(n, p))
else:
return x % p
class RsaGroup:
p = 5717
q = 1429
g = 6
rng = SystemRandom()
@classmethod
def _generator(cls):
return cls.g
@classmethod
def _random_scalar(cls):
return cls.rng.randint(1, cls.q)
@classmethod
def _int_to_scalar(cls, value):
return value % cls.q
@classmethod
def _scalar_add(cls, a, b):
return (a + b) % cls.q
@classmethod
def _scalar_neg(cls, scalar):
return (cls.q - scalar) % cls.q
@classmethod
def _scalar_mul(cls, a, b):
return (a * b) % cls.q
@classmethod
def _scalar_inverse(cls, scalar):
return inverse_of(scalar, cls.q)
@classmethod
def _element_add(cls, a, b):
return (a * b) % cls.p
@classmethod
def _mul_element_by_scalar(cls, element, scalar):
return pow(element, scalar, cls.p)
@classmethod
def _digest_message(cls, message):
message = to_bytes(message)
digest = hashlib.sha256(message).digest()
return int.from_bytes(digest, 'big') % cls.q
@classmethod
def _serialize_element(cls, element):
return element.to_bytes((cls.p.bit_length() + 7) // 8, 'big')
@classmethod
def _serialize_scalar(cls, scalar):
return scalar.to_bytes((cls.q.bit_length() + 7) // 8, 'big')
class Scalar(RsaGroup):
def __init__(self, value):
if not isinstance(value, int):
raise f"Invalid value type for scalar: {type(value)}"
self.value = self._int_to_scalar(value)
@classmethod
def identity(cls):
return Scalar(cls._int_to_scalar(0))
@classmethod
def digest(cls, message):
return Scalar(cls._digest_message(message))
@classmethod
def random(cls):
scalar = cls._random_scalar()
return Scalar(scalar)
def __bytes__(self):
return self._serialize_scalar(self.value)
def __add__(self, other):
if not isinstance(other, Scalar):
raise "Can only add scalar by another scalar"
return Scalar(self._scalar_add(self.value, other.value))
def __sub__(self, other):
if not isinstance(other, Scalar):
raise "Can only add scalar by another scalar"
return Scalar(self._scalar_add(self.value, self._scalar_neg(other.value)))
def __mul__(self, other):
if isinstance(other, Scalar):
return Scalar(self._scalar_mul(self.value, other.value))
elif isinstance(other, int):
return Scalar(self._scalar_mul(self.value, other))
elif type(other) == Element:
return Element(self._mul_element_by_scalar(other.value, self.value))
raise f"Cannot multiply scalar by type: {type(other)}"
def __truediv__(self, other):
if not isinstance(other, Scalar):
raise Exception("Can only divide a scalar by another scalar")
return self.__mul__(Scalar(self._scalar_inverse(other.value)))
def __str__(self):
return f"Scalar({self.value})"
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, Scalar):
return False
return self.value == other.value
def __ne__(self, other):
if not isinstance(other, Scalar):
return True
return self.value != other.value
class Element(RsaGroup):
def __init__(self, value):
if isinstance(value, Scalar):
self.value = self.generator() * value
elif isinstance(value, int):
self.value = value
@classmethod
def identity(cls):
return Element(1)
@classmethod
def generator(cls):
return Element(cls._generator())
def __bytes__(self):
return self._serialize_element(self.value)
def __add__(self, other):
if not isinstance(other, Element):
raise "Can only add an Element by another Element"
return Element(self._element_add(self.value, other.value))
def __mul__(self, other):
if not isinstance(other, Scalar):
raise f"Cannot multiply element by {type(other)}"
return Element(self._mul_element_by_scalar(self.value, other.value))
def __str__(self):
return f"Element({self.value})"
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, Element):
return False
return self.value == other.value
def __ne__(self, other):
if not isinstance(other, Element):
return True
return self.value != other.value
def evaluate_polynomial(x, coefficients, identity=0):
"""
Evaluate a polynomial
f(x) = (c0 * x^0) + (c1 * x^1) + (c2 * x^2) ... + (ci * x^i)
"""
value = identity
for coefficient in reversed(coefficients[1:]):
value += coefficient
value *= x
value += coefficients[0]
return value
def compute_lagrange_coefficient(x, x_set):
one = Scalar(1) if isinstance(x, Scalar) else 1
num = one
den = one
x_found = False
for x_j in x_set:
if x == x_j:
x_found = True
continue
num *= x_j
den *= x_j - x
if not x_found:
raise "x not found"
return num * (one / den)
def to_bytes(obj):
if isinstance(obj, str):
return obj.encode(encoding="utf-8")
if isinstance(obj, int):
return obj.to_bytes((obj.bit_length() + 7) // 8, "little")
if isinstance(obj, bytes):
return obj
return bytes(obj)
def to_hex(obj):
obj_bytes = to_bytes(obj)
return hex(int.from_bytes(obj_bytes, "big"))
# Let C(m, T) denote a commitment to m E {0,1}* using the random string T
def challenge(r, pubkey, message):
if not isinstance(pubkey, Element):
raise "invalid pubkey"
if not isinstance(r, Element):
raise "invalid r"
h = bytes(r) + bytes(pubkey) + to_bytes(message)
return Scalar.digest(h)
def schnorr_signature(secret, message, r=None):
if r is None:
r = Scalar.random()
if not isinstance(secret, Scalar) or not isinstance(r, Scalar):
raise "invalid secret or r"
pubkey = Element.generator() * secret
signature_r = Element.generator() * r
e = challenge(signature_r, pubkey, message)
signature_z = r + (secret * e)
assert Element.generator() * signature_z == signature_r + (pubkey * e)
return signature_z, signature_r
def verify_schnorr_signature(message, pubkey, signature):
(z, r) = signature
e = challenge(r, pubkey, message)
return (Element.generator() * z) == (r + (pubkey * e))
# p and q denote large primes such that q divides p - 1
assert RsaGroup.p > RsaGroup.q
assert (RsaGroup.p - 1) % RsaGroup.q == 0
# G is the unique subgroup of Zp of order q, and g is a generator of G
assert pow(RsaGroup.g, RsaGroup.q, RsaGroup.p) == 1
# total of members in the group
n = 5
# Minimum number of participants necessary to sign a tx:
k = 3
# It will be assumed that n > (2k - 1). As (k, n)-threshold schemes allow at most k - 1 cheating participants,
# this means that a majority of the participants is assumed to be honest.
# assert n > ((k * 2) - 1)
# It will be assumed that the members of the group have previously agreed on
# the primes p and q and the generator g of Gq
###########################################
### Selecting and Distributing the Keys ###
###########################################
class SigningCommitments:
def __init__(self, member_id, message, hiding, binding):
self.member_id = member_id
self.message = message
self.hiding = hiding
self.binding = binding
self.signature = None
def nonces(self):
return self.hiding, self.binding
def __bytes__(self):
return bytes(Scalar(self.member_id)) + bytes(self.hiding) + bytes(self.binding) + to_bytes(message)
class Member:
def __init__(self, identifier, min_signers, max_signers):
self.identifier = identifier
self.min_signers = min_signers
self.max_signers = max_signers
self._secret = None
self._coefficients = []
self._commitments = {}
self._shares = {}
self._messages = {}
# Generate Polynomial coefficients
for i in range(0, min_signers):
polynomial = Scalar.random()
self._coefficients.append(polynomial)
# Calculate coefficient commitments
commitments = []
for coefficient in self._coefficients:
commitment = Element.generator() * coefficient
commitments.append(commitment)
self._commitments[identifier] = commitments
# Compute self key share
share = self.evaluate_polynomial_for(identifier)
self.validate_share(self.identifier, share)
def coefficients(self):
return self._coefficients.copy()
def commitments(self):
return self._commitments[self.identifier].copy()
def add_commitments(self, member):
self._commitments[member.identifier] = member.commitments()
def evaluate_polynomial_for(self, x):
x = Scalar(x)
return evaluate_polynomial(x, self._coefficients, Scalar.identity())
def expected_polynomial_from(self, member_id, x=None):
if member_id not in self._commitments:
raise Exception(f"Didn't receive the commitment from member: {member_id}")
if x is None:
x = self.identifier
x = Scalar(x)
return evaluate_polynomial(x, self._commitments[member_id], Element.identity())
def public_key_share_of(self, identifier):
if len(self._commitments) < self.max_signers:
raise Exception("Cannot compute public key share, didn't receive all commitments yet")
result = Element.identity()
for x in self._commitments.keys():
result += self.expected_polynomial_from(x, identifier)
return result
def validate_share(self, member_index, share):
expect = self.expected_polynomial_from(member_index)
actual = Element.generator() * share
if actual != expect:
raise Exception(f"Invalid share, expect {expect} found {actual}")
self._shares[member_index] = share
# After receiving all shares, the member can compute his final secret share
if len(self._shares) == self.max_signers:
self._secret = Scalar.identity()
for share in self._shares.values():
self._secret += share
def secret_share(self):
if self._secret is None:
raise Exception("Cannot compute secret share, member did not receive all shares yet")
return self._secret
def sign(self, message):
secret = self.secret_share()
return schnorr_signature(secret, message)
def compute_signing_commitment(self, nonces, message):
# Generate the hiding and biding commitments
hiding_commitment = Element.generator() * nonces.hiding
binding_commitment = Element.generator() * nonces.binding
commitments = SigningCommitments(self.identifier, message, hiding_commitment, binding_commitment)
# Sign the commitments
commitments.signature = schnorr_signature(self.secret_share(), commitments)
return commitments
def verify_commitment(self, commitment):
if not isinstance(commitment, SigningCommitments):
raise Exception(f"invalid commitment, expect type SigningCommitments, found: {commitment}")
if commitment.hiding == Element.generator() or commitment.binding == Element.generator():
raise Exception("hiding and binding values cannot be zero")
member_pubkey = self.public_key_share_of(commitment.member_id)
return verify_schnorr_signature(commitment, member_pubkey, commitment.signature)
def public_key_share(self):
return Element.generator() * self.secret_share()
SigningNonces = namedtuple("SigningNonces", "hiding binding")
print(f"p = {RsaGroup.p}")
print(f"q = {RsaGroup.q}")
print(f"g = {RsaGroup.g}")
print(f"G(x) = g^x mod p\n")
print(f"max signers = {n}")
print(f"min signers = {k}")
print(f"All members generates {k} random coefficients")
members = []
for identifier in range(1, n + 1):
member = Member(identifier, k, n)
# Print member information
coefficients = list(map(lambda c: str(c.value), member.coefficients()))
commitments = list(map(lambda c: str(c.value), member.commitments()))
fx = " + ".join(f"({c} * x^{e})" for (e, c) in enumerate(coefficients))
gx = " * ".join(f"({c}^(x^{e}) mod p)" for (e, c) in enumerate(commitments))
cx = ", ".join(f"g^{c} mod p" for c in coefficients)
coefficients = ", ".join(coefficients)
commitments = ", ".join(commitments)
print(f" - Member {identifier}")
print(f" - sample {k} random coefficients: ({coefficients})")
print(f" - Compute coefficient commitments:")
print(f" commitments = ({cx})")
print(f" = ({commitments})")
print(f" - define functions: ")
print(f" f{identifier}(x) = {fx}")
print(f" g{identifier}(x) = ( {gx} ) mod p")
members.append(member)
print("\nAll members broadcast its coefficients commitments to all other members...")
for member in members:
print(f" - Member {member.identifier} broadcasts the commitments: {member.commitments()}")
for other in members:
if member.identifier == other.identifier:
continue
member.add_commitments(other)
print("\nall members now have enough information to compute the group public key Y:")
print(" Y = {}".format(" + ".join(list(map(lambda m: f"g{m.identifier}(0)", members)))))
print(" Y = {}".format(" + ".join(list(map(lambda m: f"{m.commitments()[0]}", members)))))
group_pubkey = Element.identity()
for member in members:
group_pubkey += member.commitments()[0]
print(f" Y = {group_pubkey}")
print("\nall members have enough information to compute other participants public keys {Y0, Y1, ... Yn}:")
# Any member can calculate the public key of any other member
for member in members:
print(f" - Compute Member {member.identifier} public key Y{member.identifier}:")
identifiers = list(map(lambda m: m.identifier, members))
gx = " + ".join(f"g{id}({member.identifier})" for id in identifiers)
print(f" - Y{member.identifier} = {gx}")
pubkey_share = Element.identity()
for identifier in identifiers:
pubkey_share += member.expected_polynomial_from(identifier)
print(f" - Y{member.identifier} = {pubkey_share}")
print("\nAfter receiving all commitments, each member privately sends a secret share to each other participant...")
for member in members:
print(f" - Member {member.identifier}")
for other in members:
if member.identifier == other.identifier:
continue
# Calculate share
share = member.evaluate_polynomial_for(other.identifier)
# Send share to participant
other.validate_share(member.identifier, share)
print(
f" - computes f{member.identifier}({other.identifier}) and privately sends the result to member {other.identifier}: {share}")
print("\nAll participants now have enough information to compute their secret shares {s0, s1, s2, ... sn}")
for member in members:
share = member.secret_share()
fx = " + ".join(f"f{m.identifier}({member.identifier})" for m in members)
rx = " + ".join(f"{m.evaluate_polynomial_for(member.identifier).value}" for m in members)
print(f" - Member {member.identifier} computes his secret share s{member.identifier}:")
print(f" - s{member.identifier} = {fx}")
print(f" - s{member.identifier} = {rx}")
print(f" - s{member.identifier} = {share}")
# Sanity Check
for member in members:
for other in members:
expected = other.public_key_share_of(member.identifier)
assert expected == member.public_key_share()
assert Element.generator() * member.secret_share() == expected
print(f"\nNo less than {k} participants can recovery the group secret")
print("group secret = {0}".format(" + ".join(f"{m.coefficients()[0]}" for m in members)))
group_secret = Scalar.identity()
for member in members:
group_secret += member.coefficients()[0]
print(f"group secret = {group_secret}")
assert group_pubkey == group_secret * Element.generator()
#############
## SIGNING ##
#############
print("\nAny group between k and n member can sign a message")
message = "hello world"
print(f"message: {message}")
print(f"encoded: {to_hex(message)}")
# Perform SIGN operation
signing_group = []
# Choose any group between k and n members
for i in range(k):
# Add Member to signing group
signing_group.append(members[i])
print("\nSigning group {{{0}}}".format(", ".join(f"Member {m.identifier}" for m in signing_group)))
# Each member commits to a random hiding and binding values
print("\nEach participant i in the signing group generates a random pair of nonces (di, ei)")
group_nonces = {}
group_nonce_commitments = {}
for member in signing_group:
# Random nonces
nonces = SigningNonces(hiding=Scalar.random(), binding=Scalar.random())
group_nonces[member.identifier] = nonces
# Generate commitments
signing_commitment = member.compute_signing_commitment(nonces, commitments)
assert signing_commitment.member_id == member.identifier
assert Element.generator() * nonces.hiding == signing_commitment.hiding
assert Element.generator() * nonces.binding == signing_commitment.binding
print(f" - Member {member.identifier}")
print(f" 1. Sample a random pair of single-use nonces (d{member.identifier}, e{member.identifier}):")
print(f" - d{member.identifier} = {nonces.hiding}")
print(f" - e{member.identifier} = {nonces.binding}")
print(f" 2. Derive the nonce commitment shares (D{member.identifier}, E{member.identifier}):")
print(f" - D{member.identifier} = g^{nonces.hiding.value} mod p = {signing_commitment.hiding.value}")
print(f" - E{member.identifier} = g^{nonces.binding.value} mod p = {signing_commitment.binding.value}")
group_nonce_commitments[member.identifier] = signing_commitment
print("\nall members in the signing group broadcasts their nonce commitments to the other participants")
for member in signing_group:
(hiding, binding) = group_nonce_commitments[member.identifier].nonces()
members_ids = filter(lambda m: m.identifier != member.identifier, signing_group)
members_ids = map(lambda m: f"Member {m.identifier}", members_ids)
members_ids = ", ".join(members_ids)
print(f" - Member {member.identifier} sends the nonces (E{member.identifier}, D{member.identifier}) to {members_ids}")
print("\nall members now have enough information to compute the group commitment R and binding factors {p0, ... pi}\n")
# Calculate binding factor prefix
shared_binding_prefix = bytes(group_pubkey)
# The message is hashed to force the variable-length message
# into a fixed-length byte string
shared_binding_prefix += bytes(Scalar.digest(message))
for member in signing_group:
commitments = group_nonce_commitments[member.identifier]
assert commitments.member_id == member.identifier
shared_binding_prefix += bytes(Scalar.digest(commitments))
print("Compute the binding prefix L:")
print(" L = Y | H1(m) | H1(1 | D1 | E1) | H1(2 | D2 | E2) ... + H1(n | Dn | En)")
print(f" L = {to_hex(shared_binding_prefix)}\n")
# Calculate binding factors
binding_factors = {}
for member in signing_group:
commitments = group_nonce_commitments[member.identifier]
assert commitments.member_id == member.identifier
binding_factor = Scalar.digest(shared_binding_prefix + bytes(Scalar(member.identifier)))
binding_factors[member.identifier] = binding_factor
print(f" - Computes Member {member.identifier} binding factor:")
print(f" p{member.identifier} = H1({member.identifier} | L) = {binding_factor}")
# Calculate group commitment
group_commitment = Element.identity()
for commitments in group_nonce_commitments.values():
(hiding, binding) = commitments.nonces()
# The following check prevents a party from accidentally revealing their share.
assert hiding != Element.identity() and binding != Element.identity()
binding_factor = binding_factors[commitments.member_id]
group_commitment += hiding
group_commitment += binding * binding_factor
print("\nCompute the group commitment R:")
print(" R = D1 + (E1 * p1) + D2 + (E2 * p2) ... + Dn + (En * pn)")
print(f" = {group_commitment}")
print("\nCompute the challenge c:")
group_challenge = challenge(group_commitment, group_pubkey, message)
print(f" c = H2(R | Y | m)")
print(f" = {group_challenge}")
# Each participant signs the message using its share
print(f"\nAll members in the signing group signs the message producing the signature shares (z0, z1, ... zn)")
signature_shares = {}
for member in signing_group:
ids = list(map(lambda m: Scalar(m.identifier), signing_group))
secret = member.secret_share()
lambda_i = compute_lagrange_coefficient(Scalar(member.identifier), ids)
(hiding, binding) = group_nonces[member.identifier]
binding_factor = binding_factors[member.identifier]
signature = hiding
signature += binding * binding_factor
signature += lambda_i * secret * group_challenge
signature_shares[member.identifier] = signature
i = member.identifier
print(f" - Member {i}")
print(f" l{i} = {i}th lagrange coefficient = {lambda_i}")
print(f" z{i} = d{i} + (e{i} * p{i}) + (s{i} * l{i} * c)")
print(f" z{i} = {hiding.value} + ({binding.value} * {binding_factor.value}) + ({secret.value} * {lambda_i} * {group_challenge.value})")
print(f" z{i} = {signature}")
# Each member can verify other member signatures
print("\nAny member can verify other member signature shares: ")
for member in signing_group:
ids = list(map(lambda m: Scalar(m.identifier), signing_group))
i = member.identifier
(hiding, binding) = group_nonce_commitments[i].nonces()
binding_factor = binding_factors[i]
lambda_i = compute_lagrange_coefficient(Scalar(i), ids)
public_key = member.public_key_share()
signature_commitment = hiding
signature_commitment += binding * binding_factor
signature_commitment += (lambda_i * public_key) * group_challenge
signature = signature_shares[i]
print(f" - Verify Member {i} signature share z{i}: ")
print(f" 1 - Compute Z{i}: ")
print(f" Z{i} = E{i} + D{i}^p{i} + Y{i}^(l{i} * c)")
print(f" Z{i} = {hiding.value} * ({binding.value}^{binding_factor.value}) * {public_key.value}^({lambda_i} + {group_challenge.value}) mod p")
print(f" Z{i} = {signature_commitment}")
print(f" 2 - g^z{i} mod p must be equal to Z{i}: ")
print(f" g^z{i} mod p = {Element.generator() * signature}")
assert signature_commitment == (Element.generator() * signature)
# Aggregate signature
print("\nAn aggregator collect all signatures {z0, ... zn} and sum into one signature (z, R):")
signature_r = group_commitment
signature_z = Scalar.identity()
for signature_share in signature_shares.values():
signature_z += signature_share
print("z = {}".format(" + ".join(f"z{m.identifier}" for m in signing_group)))
print("z = {}".format(signature_z))
print(f"signature = (z, R)")
print(f"signature = ({signature_z}, {signature_r})")
print(f"is signature valid: {verify_schnorr_signature(message, group_pubkey, (signature_z, signature_r))}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment