|
#! /usr/bin/env python3 |
|
from bech32 import bech32_encode, CHARSET |
|
from binascii import hexlify, unhexlify |
|
from decimal import Decimal |
|
|
|
import bitstring |
|
import hashlib |
|
import secp256k1 |
|
import time |
|
|
|
|
|
# BOLT #11: |
|
# |
|
# A writer MUST encode `amount` as a positive decimal integer with no |
|
# leading zeroes, SHOULD use the shortest representation possible. |
|
def shorten_amount(amount): |
|
""" Given an amount in bitcoin, shorten it |
|
""" |
|
# Convert to pico initially |
|
amount = int(amount * 10 ** 12) |
|
units = ["p", "n", "u", "m", ""] |
|
for unit in units: |
|
if amount % 1000 == 0: |
|
amount //= 1000 |
|
else: |
|
break |
|
return str(amount) + unit |
|
|
|
|
|
# Bech32 spits out array of 5-bit values. Shim here. |
|
def u5_to_bitarray(arr): |
|
ret = bitstring.BitArray() |
|
for a in arr: |
|
ret += bitstring.pack("uint:5", a) |
|
return ret |
|
|
|
|
|
def bitarray_to_u5(barr): |
|
assert barr.len % 5 == 0 |
|
ret = [] |
|
s = bitstring.ConstBitStream(barr) |
|
while s.pos != s.len: |
|
ret.append(s.read(5).uint) |
|
return ret |
|
|
|
|
|
# Tagged field containing BitArray |
|
def tagged(char, l): |
|
# Tagged fields need to be zero-padded to 5 bits. |
|
while l.len % 5 != 0: |
|
l.append("0b0") |
|
return ( |
|
bitstring.pack( |
|
"uint:5, uint:5, uint:5", |
|
CHARSET.find(char), |
|
(l.len / 5) / 32, |
|
(l.len / 5) % 32, |
|
) |
|
+ l |
|
) |
|
|
|
|
|
# Tagged field containing bytes |
|
def tagged_bytes(char, l): |
|
return tagged(char, bitstring.BitArray(l)) |
|
|
|
|
|
def lnencode(addr, privkey): |
|
if addr.amount: |
|
amount = Decimal(str(addr.amount)) |
|
# We can only send down to millisatoshi. |
|
if amount * 10 ** 12 % 10: |
|
raise ValueError( |
|
"Cannot encode {}: too many decimal places".format(addr.amount) |
|
) |
|
|
|
amount = addr.currency + shorten_amount(amount) |
|
else: |
|
amount = addr.currency if addr.currency else "" |
|
|
|
hrp = "ln" + amount |
|
|
|
# Start with the timestamp |
|
data = bitstring.pack("uint:35", addr.date) |
|
|
|
# Payment hash |
|
data += tagged_bytes("p", addr.paymenthash) |
|
tags_set = set() |
|
|
|
for k, v in addr.tags: |
|
# BOLT #11: |
|
# |
|
# A writer MUST NOT include more than one `d`, `h`, `n` or `x` fields, |
|
if k in ("d", "h", "n", "x"): |
|
if k in tags_set: |
|
raise ValueError("Duplicate '{}' tag".format(k)) |
|
|
|
if k == "r": |
|
route = bitstring.BitArray() |
|
for step in v: |
|
pubkey, channel, feebase, feerate, cltv = step |
|
route.append( |
|
bitstring.BitArray(pubkey) |
|
+ bitstring.BitArray(channel) |
|
+ bitstring.pack("intbe:32", feebase) |
|
+ bitstring.pack("intbe:32", feerate) |
|
+ bitstring.pack("intbe:16", cltv) |
|
) |
|
data += tagged("r", route) |
|
elif k == "d": |
|
data += tagged_bytes("d", v.encode()) |
|
elif k == "x": |
|
# Get minimal length by trimming leading 5 bits at a time. |
|
expirybits = bitstring.pack("intbe:64", v)[4:64] |
|
while expirybits.startswith("0b00000"): |
|
expirybits = expirybits[5:] |
|
data += tagged("x", expirybits) |
|
elif k == "h": |
|
data += tagged_bytes("h", hashlib.sha256(v.encode("utf-8")).digest()) |
|
elif k == "n": |
|
data += tagged_bytes("n", v) |
|
elif k == "v": # lnurl |
|
data += tagged_bytes("v", v.encode()) |
|
else: |
|
# FIXME: Support unknown tags? |
|
raise ValueError("Unknown tag {}".format(k)) |
|
|
|
tags_set.add(k) |
|
|
|
# BOLT #11: |
|
# |
|
# A writer MUST include either a `d` or `h` field, and MUST NOT include |
|
# both. |
|
if "d" in tags_set and "h" in tags_set: |
|
raise ValueError("Cannot include both 'd' and 'h'") |
|
if not "d" in tags_set and not "h" in tags_set: |
|
raise ValueError("Must include either 'd' or 'h'") |
|
|
|
# We actually sign the hrp, then data (padded to 8 bits with zeroes). |
|
privkey = secp256k1.PrivateKey(bytes(unhexlify(privkey))) |
|
sig = privkey.ecdsa_sign_recoverable( |
|
bytearray([ord(c) for c in hrp]) + data.tobytes() |
|
) |
|
# This doesn't actually serialize, but returns a pair of values :( |
|
sig, recid = privkey.ecdsa_recoverable_serialize(sig) |
|
data += bytes(sig) + bytes([recid]) |
|
|
|
return bech32_encode(hrp, bitarray_to_u5(data)) |
|
|
|
|
|
class LnAddr(object): |
|
def __init__( |
|
self, paymenthash=None, amount=None, currency="bc", tags=None, date=None |
|
): |
|
self.date = int(time.time()) if not date else int(date) |
|
self.tags = [] if not tags else tags |
|
self.unknown_tags = [] |
|
self.paymenthash = paymenthash |
|
self.signature = None |
|
self.pubkey = None |
|
self.currency = currency |
|
self.amount = amount |
|
|
|
def __str__(self): |
|
return "LnAddr[{}, amount={}{} tags=[{}]]".format( |
|
hexlify(self.pubkey.serialize()).decode("utf-8"), |
|
self.amount, |
|
self.currency, |
|
", ".join([k + "=" + str(v) for k, v in self.tags]), |
|
) |