Skip to content

Instantly share code, notes, and snippets.

@killerstorm
Last active November 26, 2024 14:38
Show Gist options
  • Save killerstorm/f53c6c04c4fd9b081d5e6e7be2340d1b to your computer and use it in GitHub Desktop.
Save killerstorm/f53c6c04c4fd9b081d5e6e7be2340d1b to your computer and use it in GitHub Desktop.
postchain python client, AI generated
from typing import Dict, List, Union, Any, Tuple
from enum import Enum
import asn1
# Define GTV types
RawGtv = Union[None, bool, bytes, str, int, Dict[str, Any], List[Any]]
DictPairTuple = Tuple[str, Any]
class GTVType(Enum):
NULL = 0
BYTE_ARRAY = 1
STRING = 2
INTEGER = 3
DICT = 4
ARRAY = 5
BIG_INTEGER = 6
class GTV:
def __init__(self, type_: GTVType, value: Any):
self.type = type_
self.value = value
def gtv_null() -> GTV:
return GTV(GTVType.NULL, None)
def gtv_bytes(value: bytes) -> GTV:
return GTV(GTVType.BYTE_ARRAY, value)
def gtv_str(value: str) -> GTV:
return GTV(GTVType.STRING, value)
def gtv_int(value: int) -> GTV:
"""Convert value to GtvInteger, raises ValueError if out of range"""
if value > (1 << 63) - 1 or value < -(1 << 63):
raise ValueError(f"Integer {value} out of range for GtvInteger")
return GTV(GTVType.INTEGER, value)
def gtv_big_int(value: int) -> GTV:
"""Convert value to GtvBigInteger"""
return GTV(GTVType.BIG_INTEGER, value)
def gtv_array(values: List[Union[GTV, RawGtv]]) -> GTV:
typed_values = [value if isinstance(value, GTV) else gtv_auto(value)
for value in values]
return GTV(GTVType.ARRAY, typed_values)
def gtv_dict(pairs: Dict[str, Union[GTV, RawGtv]]) -> GTV:
sorted_pairs = []
for key, value in sorted(pairs.items()):
typed_value = value if isinstance(value, GTV) else gtv_auto(value)
sorted_pairs.append((key, typed_value))
return GTV(GTVType.DICT, sorted_pairs)
def gtv_auto(value: RawGtv) -> GTV:
"""Convert a Python value to a typed GTV value automatically"""
if value is None:
return gtv_null()
if isinstance(value, bytes):
return gtv_bytes(value)
if isinstance(value, bool):
return gtv_int(1 if value else 0)
if isinstance(value, str):
return gtv_str(value)
if isinstance(value, int):
try:
return gtv_int(value)
except ValueError:
return gtv_big_int(value)
if isinstance(value, list):
return gtv_array(value)
if isinstance(value, dict):
return gtv_dict(value)
raise TypeError(f"Cannot convert type {type(value)} to GTV")
def encode_value(value: Union[RawGtv, GTV]) -> bytes:
"""Encode a value (either RawGTV or already typed GTV) to ASN.1 DER format"""
encoder = asn1.Encoder()
encoder.start()
if isinstance(value, GTV):
_encode_asn_value(encoder, value)
else:
typed_arg = gtv_auto(value)
_encode_asn_value(encoder, typed_arg)
return encoder.output()
def _encode_asn_value(encoder: asn1.Encoder, typed_arg: GTV):
"""Helper function to encode GTV values to ASN.1"""
if typed_arg.type == GTVType.NULL:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
encoder.write(None, asn1.Numbers.Null)
elif typed_arg.type == GTVType.BYTE_ARRAY:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
encoder.write(typed_arg.value, asn1.Numbers.OctetString)
elif typed_arg.type == GTVType.STRING:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
encoder.write(typed_arg.value, asn1.Numbers.UTF8String)
elif typed_arg.type == GTVType.INTEGER:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
encoder.write(typed_arg.value, asn1.Numbers.Integer)
elif typed_arg.type == GTVType.BIG_INTEGER:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
encoder.write(typed_arg.value, asn1.Numbers.Integer)
elif typed_arg.type == GTVType.ARRAY:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
with encoder.construct(nr=asn1.Numbers.Sequence):
for item in typed_arg.value:
_encode_asn_value(encoder, item)
elif typed_arg.type == GTVType.DICT:
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context):
with encoder.construct(nr=asn1.Numbers.Sequence):
for key, value in typed_arg.value:
with encoder.construct(nr=asn1.Numbers.Sequence):
encoder.write(key, asn1.Numbers.UTF8String)
_encode_asn_value(encoder, value)
def decode_value(data: bytes) -> GTV:
"""Decode ASN.1 DER format to typed GTV"""
try:
decoder = asn1.Decoder()
decoder.start(data)
return _decode_typed_value(decoder)
except Exception as e:
raise ValueError(f"Failed to decode GTV: {str(e)}")
def decode_raw_value(data: bytes) -> RawGtv:
"""Decode ASN.1 DER format to raw Python value"""
return to_raw_value(decode_value(data))
def _decode_typed_value(decoder: asn1.Decoder) -> GTV:
"""Helper function to decode ASN.1 to typed GTV"""
tag = decoder.peek()
if tag is None:
return gtv_null()
tag_nr = tag.nr & ~0xe0 # Remove class bits
if tag_nr == GTVType.NULL.value:
decoder.enter()
decoder.read()
decoder.leave()
return gtv_null()
elif tag_nr == GTVType.BYTE_ARRAY.value:
decoder.enter()
_, val = decoder.read()
decoder.leave()
return gtv_bytes(val)
elif tag_nr == GTVType.STRING.value:
decoder.enter()
_, val = decoder.read()
decoder.leave()
val = val.decode('utf-8') if isinstance(val, bytes) else val
return gtv_str(val)
elif tag_nr == GTVType.INTEGER.value:
decoder.enter()
_, val = decoder.read()
decoder.leave()
return gtv_int(val)
elif tag_nr == GTVType.BIG_INTEGER.value:
decoder.enter()
_, val = decoder.read()
decoder.leave()
return gtv_big_int(val)
elif tag_nr == GTVType.ARRAY.value:
decoder.enter()
array = []
if decoder.peek():
decoder.enter() # Enter sequence
while decoder.peek():
array.append(_decode_typed_value(decoder))
decoder.leave()
decoder.leave()
return gtv_array(array)
elif tag_nr == GTVType.DICT.value:
decoder.enter()
dict_pairs = {}
if decoder.peek():
decoder.enter() # Enter sequence
while decoder.peek():
decoder.enter() # Enter pair sequence
_, key = decoder.read()
key = key.decode('utf-8') if isinstance(key, bytes) else key
value = _decode_typed_value(decoder)
dict_pairs[key] = value
decoder.leave()
decoder.leave()
decoder.leave()
return gtv_dict(dict_pairs)
raise ValueError(f"Unexpected ASN.1 tag: {tag}")
def to_raw_value(gtv: GTV) -> RawGtv:
"""Convert typed GTV to raw Python value"""
if gtv.type == GTVType.NULL:
return None
elif gtv.type in (GTVType.BYTE_ARRAY, GTVType.STRING, GTVType.INTEGER, GTVType.BIG_INTEGER):
return gtv.value
elif gtv.type == GTVType.ARRAY:
return [to_raw_value(x) for x in gtv.value]
elif gtv.type == GTVType.DICT:
return {k: to_raw_value(v) for k, v in gtv.value}
else:
raise TypeError(f"Unknown GTV type: {gtv.type}")
def is_gtv_compatible(value: Any) -> bool:
"""Check if a value can be represented in GTV automatically"""
try:
gtv_auto(value)
return True
except TypeError:
return False
import hashlib
from typing import List, Any, Union, Dict
from gtv import GTV, encode_value, gtv_auto, is_gtv_compatible, GTVType
# Hash prefix constants
HASH_PREFIX_LEAF = 1
HASH_PREFIX_NODE = 0
HASH_PREFIX_NODE_ARRAY = 7
HASH_PREFIX_NODE_DICT = 8
def sha256(data: bytes) -> bytes:
"""Calculate SHA256 hash of data"""
return hashlib.sha256(data).digest()
class CryptoSystem:
"""Simple crypto system for hashing"""
def digest(self, buffer: bytes) -> bytes:
return sha256(buffer)
class BinaryTreeElement:
"""Base class for tree elements"""
pass
class Node(BinaryTreeElement):
"""Represents an internal node in the Merkle tree"""
def __init__(self, left: BinaryTreeElement, right: BinaryTreeElement):
self.left = left
self.right = right
class Leaf(BinaryTreeElement):
"""Represents a leaf node containing a GTV value"""
def __init__(self, content: GTV):
self.content = content
class EmptyLeaf(BinaryTreeElement):
"""Represents an empty leaf node"""
pass
class ContainerNode(Node):
"""Base class for container nodes"""
def __init__(self, left: BinaryTreeElement, right: BinaryTreeElement, content: Any, size: int):
super().__init__(left, right)
self.content = content
self.size = size
class ArrayHeadNode(ContainerNode):
"""Special node type for array containers"""
pass
class DictHeadNode(ContainerNode):
"""Special node type for dictionary containers"""
pass
class BinaryTreeFactory:
"""Factory for creating binary trees from GTV values"""
@staticmethod
def build_higher_layer(elements: List[BinaryTreeElement]) -> BinaryTreeElement:
"""Build the next layer of the tree from a list of elements"""
if not elements:
raise ValueError("Cannot work on empty arrays")
if len(elements) == 1:
return elements[0]
result = []
i = 0
while i < len(elements) - 1:
result.append(Node(elements[i], elements[i + 1]))
i += 2
# Handle odd number of elements
if i < len(elements):
result.append(elements[i])
return BinaryTreeFactory.build_higher_layer(result)
@staticmethod
def handle_array_container(items: List[Any]) -> BinaryTreeElement:
"""Build a tree from array items"""
if not items:
return ArrayHeadNode(EmptyLeaf(), EmptyLeaf(), [], 0)
# Build leaves recursively
leaves = []
for item in items:
leaves.append(BinaryTreeFactory.build_tree(item))
if len(leaves) == 1:
# Single item case - leaf becomes left, right is empty
return ArrayHeadNode(leaves[0], EmptyLeaf(), items, len(items))
# Build tree structure for multiple items
tree_root = BinaryTreeFactory.build_higher_layer(leaves)
# If tree_root is a Node, use its left/right directly
if isinstance(tree_root, Node):
return ArrayHeadNode(tree_root.left, tree_root.right, items, len(items))
# If tree_root is a Leaf, it becomes the left child
return ArrayHeadNode(tree_root, EmptyLeaf(), items, len(items))
@staticmethod
def handle_dict_container(items: List[tuple[str, GTV]]) -> BinaryTreeElement:
"""Build a tree from dictionary items"""
if not items:
return DictHeadNode(EmptyLeaf(), EmptyLeaf(), [], 0)
# Build leaves recursively
leaves = []
for key, value in items:
leaves.append(Leaf(gtv_auto(key)))
leaves.append(BinaryTreeFactory.build_tree(value))
if len(leaves) == 2: # Single key-value pair
return DictHeadNode(leaves[0], leaves[1], items, len(items))
# Build tree structure
tree_root = BinaryTreeFactory.build_higher_layer(leaves)
# If tree_root is a Node, use its left/right directly
if isinstance(tree_root, Node):
return DictHeadNode(tree_root.left, tree_root.right, items, len(items))
# If tree_root is a Leaf, it becomes the left child
return DictHeadNode(tree_root, EmptyLeaf(), items, len(items))
@staticmethod
def build_tree(value: Union[GTV, Any]) -> BinaryTreeElement:
"""Build tree elements from a GTV value"""
if not isinstance(value, GTV):
value = gtv_auto(value)
# Handle container types recursively
if value.type == GTVType.ARRAY:
return BinaryTreeFactory.handle_array_container(value.value)
elif value.type == GTVType.DICT:
return BinaryTreeFactory.handle_dict_container(value.value)
else:
return Leaf(value)
class MerkleHashCalculator:
"""Calculates Merkle tree hashes with support for nested structures"""
def __init__(self, crypto_system: CryptoSystem):
self.crypto_system = crypto_system
def calculate_node_hash(self, prefix: int, hash_left: bytes, hash_right: bytes) -> bytes:
"""Calculate hash of an internal node"""
buffer_sum = bytes([prefix]) + hash_left + hash_right
return self.crypto_system.digest(buffer_sum)
def calculate_leaf_hash(self, value: GTV) -> bytes:
"""Calculate hash of a leaf node"""
buffer_sum = bytes([HASH_PREFIX_LEAF]) + encode_value(value)
return self.crypto_system.digest(buffer_sum)
def calculate_merkle_hash(self, element: BinaryTreeElement) -> bytes:
"""Calculate Merkle hash recursively for any tree element"""
if isinstance(element, EmptyLeaf):
return bytes(32)
if isinstance(element, Leaf):
return self.calculate_leaf_hash(element.content)
# Get hash prefix based on node type
if isinstance(element, ArrayHeadNode):
prefix = HASH_PREFIX_NODE_ARRAY
elif isinstance(element, DictHeadNode):
prefix = HASH_PREFIX_NODE_DICT
else:
prefix = HASH_PREFIX_NODE
return self.calculate_node_hash(
prefix,
self.calculate_merkle_hash(element.left),
self.calculate_merkle_hash(element.right)
)
def gtv_hash(value: Union[GTV, Any]) -> bytes:
"""Calculate the GTV hash of a value"""
if not isinstance(value, GTV):
value = gtv_auto(value)
calculator = MerkleHashCalculator(CryptoSystem())
tree = BinaryTreeFactory.build_tree(value)
return calculator.calculate_merkle_hash(tree)
import hashlib
from typing import List, Any, Union
from gtv import GTV, encode_value, gtv_auto, GTVType
HASH_PREFIX_LEAF = 1
HASH_PREFIX_NODE = 0
HASH_PREFIX_NODE_ARRAY = 7
HASH_PREFIX_NODE_DICT = 8
def sha256(data: bytes) -> bytes:
return hashlib.sha256(data).digest()
def calculate_leaf_hash(value: GTV) -> bytes:
buffer_sum = bytes([HASH_PREFIX_LEAF]) + encode_value(value)
return sha256(buffer_sum)
def calculate_node_hash(prefix: int, hash_left: bytes, hash_right: bytes) -> bytes:
buffer_sum = bytes([prefix]) + hash_left + hash_right
return sha256(buffer_sum)
def get_empty_hash() -> bytes:
return bytes(32)
def hash_array_elements(elements: List[GTV]) -> bytes:
element_hashes = [gtv_hash_simple(elem) for elem in elements]
while len(element_hashes) > 2: # build tree until we reach the top node
new_hashes = []
for i in range(0, len(element_hashes), 2):
if i + 1 < len(element_hashes):
new_hash = calculate_node_hash(
HASH_PREFIX_NODE,
element_hashes[i],
element_hashes[i + 1]
)
else:
new_hash = element_hashes[i]
new_hashes.append(new_hash)
element_hashes = new_hashes
return calculate_node_hash(
HASH_PREFIX_NODE_ARRAY,
element_hashes[0] if len(element_hashes) > 0 else get_empty_hash(),
get_empty_hash() if len(element_hashes) == 1 else element_hashes[1]
)
def hash_dict_elements(pairs: List[tuple[str, GTV]]) -> bytes:
# Create list of hashes for all keys and values
element_hashes = []
for key, value in pairs:
element_hashes.append(calculate_leaf_hash(gtv_auto(key)))
element_hashes.append(gtv_hash_simple(value))
while len(element_hashes) > 2: #build tree until we reach top node
new_hashes = []
for i in range(0, len(element_hashes), 2):
if i + 1 < len(element_hashes):
new_hash = calculate_node_hash(
HASH_PREFIX_NODE,
element_hashes[i],
element_hashes[i + 1]
)
else:
new_hash = element_hashes[i]
new_hashes.append(new_hash)
element_hashes = new_hashes
return calculate_node_hash(
HASH_PREFIX_NODE_DICT,
element_hashes[0] if len(element_hashes) > 0 else get_empty_hash(),
get_empty_hash() if len(element_hashes) == 1 else element_hashes[1]
)
def gtv_hash_simple(value: Union[GTV, Any]) -> bytes:
"""Calculate GTV hash without building explicit tree structure"""
if not isinstance(value, GTV):
value = gtv_auto(value)
if value.type == GTVType.ARRAY:
return hash_array_elements(value.value)
elif value.type == GTVType.DICT:
return hash_dict_elements(value.value)
else:
return calculate_leaf_hash(value)
def test_gtv_hash_implementations() -> bool:
"""Test if simplified implementation matches the baseline"""
from gtv_hash import gtv_hash
test_cases = [
None,
"test",
123,
[1, 2, 3],
{"a": 1, "b": 2},
[{"x": [1,2]}, "test", 123],
{"complex": [1, {"a": None}, "test"]},
bytes([1,2,3])
]
for test_case in test_cases:
hash1 = gtv_hash(test_case)
hash2 = gtv_hash_simple(test_case)
if hash1 != hash2:
print(f"Hash mismatch for value: {test_case}")
print(f"Original: {hash1.hex()}")
print(f"Simple: {hash2.hex()}")
return False
print("All test cases passed!")
return True
if __name__ == "__main__":
test_gtv_hash_implementations()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment