Created
May 17, 2022 07:53
-
-
Save bigspider/10df51401be3aa6120217c03c2836ffa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterable, List, Mapping, Tuple, Generator | |
def find_all(text: str, pattern: str, start: int = 0) -> Generator[int, None, None]: | |
"""Generates all the positions of `pattern` as a substring of `text`, starting from index at least `start`.""" | |
while True: | |
start = text.find(pattern, start) | |
if start == -1: | |
return | |
yield start | |
start += len(pattern) | |
def find_first(text: str, start_pos: int, patterns: Iterable[str]) -> int: | |
"""Returns the position of the first occurrence of any of the elements in `patterns` as a substring of `text`, | |
or -1 if none of the patterns is found.""" | |
matches = (text.find(x, start_pos) for x in patterns) | |
return min((x for x in matches if x != -1), default=-1) | |
def find_key_end_position(desc: str, start_pos: int) -> int: | |
"""Assuming that `start_pos` is the beginning of a KEY expression (and not musig2), finds the position of the end | |
of the key expression, excluding (if present) the final derivation steps after an xpub. This is the information | |
that goes into an entry of the vector of key information of the wallet policy.""" | |
has_orig_info = True if desc[start_pos] == '[' else False | |
if has_orig_info: | |
closing_bracket_pos = desc.find("]", start_pos) | |
if closing_bracket_pos == -1: | |
raise Exception("Invalid descriptor: could not find closing ']'") | |
key_pos_start = closing_bracket_pos + 1 | |
else: | |
key_pos_start = start_pos | |
# find the earliest occurrence of ",", a ")" or a "/" (it must find at least 1) | |
end_pos = find_first(desc, key_pos_start, [",", ")", "/"]) | |
if end_pos == -1: | |
raise Exception( | |
"Invalid descriptor: cannot find the end of key expression") | |
return end_pos | |
class WalletPolicy(object): | |
"""Simple class to represent wallet policies. This is a toy implementation that does not parse the descriptor | |
template. A more robust implementation would build the abstract syntax tree of the template and of the descriptor, | |
allowing one to detect errors, and manipulate t semantically instead of relying on string manipulation.""" | |
def __init__(self, descriptor_template: str, keys_info: List[str]): | |
self.descriptor_template = descriptor_template | |
self.keys_info = keys_info | |
def to_descriptor(self) -> str: | |
"""Converts a wallet policy into the descriptor (with the /<M,N> syntax, if present).""" | |
desc = self.descriptor_template | |
# replace each "/**" with "/<0,1>/*" (TODO: remove if /** is not supported) | |
desc = desc.replace("/**", "/<0,1>/*") | |
# process all the @N expressions in decreasing order. This guarantees that string replacements | |
# works as expected (as any prefix expression is processed after). | |
for i in reversed(range(len(self.keys_info))): | |
desc = desc.replace(f"@{i}", self.keys_info[i]) | |
# there should not be any remaining "@" expressions | |
if desc.find("@") != -1: | |
return Exception("Invalid descriptor template: contains invalid key index") | |
return desc | |
@classmethod | |
def from_descriptor(cls, descriptor: str) -> 'WalletPolicy': | |
"""Converts a "reasonable" descriptor (with the /<M,N> syntax) into the corresponding wallet policy.""" | |
# list of pairs of integers, where the tuple (m,n) with m < n means a key expression starts at | |
# m (inclusive) and at n (exclusive) | |
key_expressions: List[Tuple[int, int]] = [] | |
key_with_orig_pos_start = None | |
def parse_key_expressions(only_first=False, handle_musig2=False): | |
# Starting at the position in `key_with_orig_pos_start`, parses a number of key expressions, and updates | |
# the `key_expressions` array accordingly. | |
# If `only_first` is `True`, it stops after parsing a single key expression. | |
# If `handle_musig2` is `True`, and a key expression is a `musig2` operator, it recursively parses | |
# the keys in the musig2 expression. `musig2` inside `musig2` is not allowed. | |
nonlocal key_with_orig_pos_start | |
if key_with_orig_pos_start is None: | |
raise Exception("Unexpected error") | |
while True: | |
if handle_musig2 and descriptor[key_with_orig_pos_start:].startswith("musig2"): | |
closing_parenthesis_pos = find_first( | |
descriptor, key_with_orig_pos_start, [")"]) | |
if closing_parenthesis_pos == -1: | |
raise Exception( | |
"Invalid descriptor: musig2 without closing parenthesis") | |
key_with_orig_pos_start = key_with_orig_pos_start + \ | |
len("musig2(") | |
parse_key_expressions( | |
only_first=False, handle_musig2=False) | |
key_pos_end = closing_parenthesis_pos + 1 | |
else: | |
key_pos_end = find_key_end_position( | |
descriptor, key_with_orig_pos_start) | |
key_expressions.append( | |
(key_with_orig_pos_start, key_pos_end)) | |
if descriptor[key_pos_end] == '/': | |
# find the actual end (comma or closing parenthesis) | |
key_pos_end = find_first( | |
descriptor, key_pos_end, [",", ")"]) | |
if key_pos_end == -1: | |
raise Exception( | |
"Invalid descriptor: unterminated key expression") | |
if descriptor[key_pos_end] == ',': | |
# There is another key expression, repeat from after the comma | |
key_with_orig_pos_start = key_pos_end + 1 | |
else: | |
break | |
if only_first: | |
break | |
# operators for which the KEY is the first argument | |
operators_key_first = ["pk", "pkh", "pk_h", "pk_k", "tr"] | |
# operators for which the KEY is everything except the first argument | |
operators_key_all_but_first = [ | |
"multi", "sortedmulti", "multi_a", "sortedmulti_a"] | |
for op in operators_key_first + operators_key_all_but_first: | |
for op_pos_start in find_all(descriptor, op + "("): | |
# ignore if not a whole word (otherwise "sortedmulti" would be found inside "multi") | |
if op_pos_start > 0 and 'a' <= desc[op_pos_start - 1] <= 'z': | |
continue | |
if op in operators_key_all_but_first: | |
# skip the first argument (we now it's not a KEY expression, so it does not have a comma) | |
first_comma_pos = descriptor.find(",", op_pos_start) | |
if first_comma_pos == -1: | |
raise Exception( | |
"Invalid descriptor: multi, sortedmulti, multi_a and sortedmulti_a must have at least two arguments") | |
key_with_orig_pos_start = 1 + first_comma_pos | |
else: | |
# other operators, the first argument is already a KEY expression | |
key_with_orig_pos_start = op_pos_start + len(op) + 1 | |
only_first = op in operators_key_first | |
parse_key_expressions( | |
only_first=only_first, handle_musig2=True) | |
result: List[str] = [] | |
keys: List[str] = [] | |
keys_to_idx: Mapping[str, int] = {} | |
prev_end = 0 | |
for start, end in sorted(key_expressions): | |
result.append(descriptor[prev_end:start]) | |
key = descriptor[start:end] | |
if key not in keys_to_idx: | |
idx = len(keys) | |
keys.append(key) | |
keys_to_idx[key] = idx | |
else: | |
idx = keys_to_idx[key] | |
result.append(f"@{idx}") | |
prev_end = end | |
result.append(descriptor[prev_end:]) | |
return cls("".join(result), keys) | |
if __name__ == "__main__": | |
descriptors = [ | |
"pkh([d34db33f/44'/0'/0']xpub6ERApfZwUNrhLCkDtcHTcxd75RbzS1ed54G1LkBUHQVHQKqhMkhgbmJbZRkrgZw4koxb5JaHWkY4ALHY2grBGRjaDMzQLcgJvLJuZZvRcEL/**)", | |
"wsh(multi(1,xpub661MyMwAqRbcFW31YEwpkMuc5THy2PSt5bDMsktWQcFF8syAmRUapSCGu8ED9W6oDMSgv6Zz8idoc4a6mr8BDzTJY47LJhkJ8UB7WEGuduB/**,xpub69H7F5d8KSRgmmdJg2KhpAK8SR3DjMwAdkxj3ZuxV27CprR9LgpeyGmXUbC6wb7ERfvrnKZjXoUmmDznezpbZb7ap6r1D3tgFxHmwMkQTPH/**))", | |
"tr(c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee5,{pk(fff97bd5755eeea420453a14355235d382f6472f8568a18b2f057a1460297556),pk(e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd13)})", | |
"tr(c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee5,sortedmulti_a(2,2f8bde4d1a07209355b4a7250a5c5128e88b84bddc619ab7cba8d569b240efe4,5cbdf0646e5db4eaa398f365f2ea7a0e3d419b7e0330e39ce92bddedcac4f9bc))", | |
"tr([11111111/44'/0'/0']xpub6CLZSUDtcUhJVDoPSY8pSRKi4W1RSSLBgwZ2AYmwTH9Yv5tPVFHZxJBUQ27QLLwHej6kfo9DQQbwaHmpXsQq59CjtsE2gNLHmojwgMrsQNe/**,{and_v(v:pk([22222222/44'/0'/0']xpub6CiztfGsUxmpwkWe6gvz8d5VHyFLDoiPpeUfWmQ2vWAhQL3Z1hhEc6PE4irFs4bzjS7dCB4yyinaubrCpFJq4bcKGCD4jjqTxaWiKAJ7mvJ/**),older(52596)),multi_a(2,[33333333/44'/0'/0']xpub6DTZd6od7is2wxXndmE7zaUifzFPwVKshVSGEZedfTJtUjfLyhy4hgCW15hvxRpGaDmtiFoJKaCEaSRfXrQBuYRx18zwquy46dwBsJnsrz2/**,[44444444/44'/0'/0']xpub6BnK4wFbPeLZM4VNjoUA4yLCru6kCT3bhDJNBhbzHLGp1fmgK6muz27h4drixJZeHG8vSS5U5EYyE3gE8ozG94iNg3NDYE8M5YafvhzhMR9/**)})", | |
"tr(musig2([33333333/44'/0'/0']xpub6DTZd6od7is2wxXndmE7zaUifzFPwVKshVSGEZedfTJtUjfLyhy4hgCW15hvxRpGaDmtiFoJKaCEaSRfXrQBuYRx18zwquy46dwBsJnsrz2,[44444444/44'/0'/0']xpub6BnK4wFbPeLZM4VNjoUA4yLCru6kCT3bhDJNBhbzHLGp1fmgK6muz27h4drixJZeHG8vSS5U5EYyE3gE8ozG94iNg3NDYE8M5YafvhzhMR9)/**,{and_v(v:pk([22222222/44'/0'/0']xpub6CiztfGsUxmpwkWe6gvz8d5VHyFLDoiPpeUfWmQ2vWAhQL3Z1hhEc6PE4irFs4bzjS7dCB4yyinaubrCpFJq4bcKGCD4jjqTxaWiKAJ7mvJ/**),older(52596)),pk([11111111/44'/0'/0']xpub6CLZSUDtcUhJVDoPSY8pSRKi4W1RSSLBgwZ2AYmwTH9Yv5tPVFHZxJBUQ27QLLwHej6kfo9DQQbwaHmpXsQq59CjtsE2gNLHmojwgMrsQNe/**)})", | |
] | |
for desc in descriptors: | |
# Demoes the conversion from a "sane" descriptor to a wallet policy | |
print(f"Descriptor:\n{desc}") | |
wp = WalletPolicy.from_descriptor(desc) | |
print(f"Policy descriptor template:\n{wp.descriptor_template}") | |
print(f"Keys:\n{wp.keys_info}") | |
print("======================================================\n") | |
# Converting back to descriptors also works, as long as we take care of /** | |
assert wp.to_descriptor().replace("/<0,1>/*", "/**") == desc |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment