Last active
November 26, 2024 14:38
-
-
Save killerstorm/f53c6c04c4fd9b081d5e6e7be2340d1b to your computer and use it in GitHub Desktop.
postchain python client, AI generated
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict, List, Union, Any, Tuple | |
from enum import Enum | |
import asn1 | |
# Define GTV types | |
RawGtv = Union[None, bool, bytes, str, int, Dict[str, Any], List[Any]] | |
DictPairTuple = Tuple[str, Any] | |
class GTVType(Enum): | |
NULL = 0 | |
BYTE_ARRAY = 1 | |
STRING = 2 | |
INTEGER = 3 | |
DICT = 4 | |
ARRAY = 5 | |
BIG_INTEGER = 6 | |
class GTV: | |
def __init__(self, type_: GTVType, value: Any): | |
self.type = type_ | |
self.value = value | |
def gtv_null() -> GTV: | |
return GTV(GTVType.NULL, None) | |
def gtv_bytes(value: bytes) -> GTV: | |
return GTV(GTVType.BYTE_ARRAY, value) | |
def gtv_str(value: str) -> GTV: | |
return GTV(GTVType.STRING, value) | |
def gtv_int(value: int) -> GTV: | |
"""Convert value to GtvInteger, raises ValueError if out of range""" | |
if value > (1 << 63) - 1 or value < -(1 << 63): | |
raise ValueError(f"Integer {value} out of range for GtvInteger") | |
return GTV(GTVType.INTEGER, value) | |
def gtv_big_int(value: int) -> GTV: | |
"""Convert value to GtvBigInteger""" | |
return GTV(GTVType.BIG_INTEGER, value) | |
def gtv_array(values: List[Union[GTV, RawGtv]]) -> GTV: | |
typed_values = [value if isinstance(value, GTV) else gtv_auto(value) | |
for value in values] | |
return GTV(GTVType.ARRAY, typed_values) | |
def gtv_dict(pairs: Dict[str, Union[GTV, RawGtv]]) -> GTV: | |
sorted_pairs = [] | |
for key, value in sorted(pairs.items()): | |
typed_value = value if isinstance(value, GTV) else gtv_auto(value) | |
sorted_pairs.append((key, typed_value)) | |
return GTV(GTVType.DICT, sorted_pairs) | |
def gtv_auto(value: RawGtv) -> GTV: | |
"""Convert a Python value to a typed GTV value automatically""" | |
if value is None: | |
return gtv_null() | |
if isinstance(value, bytes): | |
return gtv_bytes(value) | |
if isinstance(value, bool): | |
return gtv_int(1 if value else 0) | |
if isinstance(value, str): | |
return gtv_str(value) | |
if isinstance(value, int): | |
try: | |
return gtv_int(value) | |
except ValueError: | |
return gtv_big_int(value) | |
if isinstance(value, list): | |
return gtv_array(value) | |
if isinstance(value, dict): | |
return gtv_dict(value) | |
raise TypeError(f"Cannot convert type {type(value)} to GTV") | |
def encode_value(value: Union[RawGtv, GTV]) -> bytes: | |
"""Encode a value (either RawGTV or already typed GTV) to ASN.1 DER format""" | |
encoder = asn1.Encoder() | |
encoder.start() | |
if isinstance(value, GTV): | |
_encode_asn_value(encoder, value) | |
else: | |
typed_arg = gtv_auto(value) | |
_encode_asn_value(encoder, typed_arg) | |
return encoder.output() | |
def _encode_asn_value(encoder: asn1.Encoder, typed_arg: GTV): | |
"""Helper function to encode GTV values to ASN.1""" | |
if typed_arg.type == GTVType.NULL: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
encoder.write(None, asn1.Numbers.Null) | |
elif typed_arg.type == GTVType.BYTE_ARRAY: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
encoder.write(typed_arg.value, asn1.Numbers.OctetString) | |
elif typed_arg.type == GTVType.STRING: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
encoder.write(typed_arg.value, asn1.Numbers.UTF8String) | |
elif typed_arg.type == GTVType.INTEGER: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
encoder.write(typed_arg.value, asn1.Numbers.Integer) | |
elif typed_arg.type == GTVType.BIG_INTEGER: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
encoder.write(typed_arg.value, asn1.Numbers.Integer) | |
elif typed_arg.type == GTVType.ARRAY: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
with encoder.construct(nr=asn1.Numbers.Sequence): | |
for item in typed_arg.value: | |
_encode_asn_value(encoder, item) | |
elif typed_arg.type == GTVType.DICT: | |
with encoder.construct(nr=typed_arg.type.value, cls=asn1.Classes.Context): | |
with encoder.construct(nr=asn1.Numbers.Sequence): | |
for key, value in typed_arg.value: | |
with encoder.construct(nr=asn1.Numbers.Sequence): | |
encoder.write(key, asn1.Numbers.UTF8String) | |
_encode_asn_value(encoder, value) | |
def decode_value(data: bytes) -> GTV: | |
"""Decode ASN.1 DER format to typed GTV""" | |
try: | |
decoder = asn1.Decoder() | |
decoder.start(data) | |
return _decode_typed_value(decoder) | |
except Exception as e: | |
raise ValueError(f"Failed to decode GTV: {str(e)}") | |
def decode_raw_value(data: bytes) -> RawGtv: | |
"""Decode ASN.1 DER format to raw Python value""" | |
return to_raw_value(decode_value(data)) | |
def _decode_typed_value(decoder: asn1.Decoder) -> GTV: | |
"""Helper function to decode ASN.1 to typed GTV""" | |
tag = decoder.peek() | |
if tag is None: | |
return gtv_null() | |
tag_nr = tag.nr & ~0xe0 # Remove class bits | |
if tag_nr == GTVType.NULL.value: | |
decoder.enter() | |
decoder.read() | |
decoder.leave() | |
return gtv_null() | |
elif tag_nr == GTVType.BYTE_ARRAY.value: | |
decoder.enter() | |
_, val = decoder.read() | |
decoder.leave() | |
return gtv_bytes(val) | |
elif tag_nr == GTVType.STRING.value: | |
decoder.enter() | |
_, val = decoder.read() | |
decoder.leave() | |
val = val.decode('utf-8') if isinstance(val, bytes) else val | |
return gtv_str(val) | |
elif tag_nr == GTVType.INTEGER.value: | |
decoder.enter() | |
_, val = decoder.read() | |
decoder.leave() | |
return gtv_int(val) | |
elif tag_nr == GTVType.BIG_INTEGER.value: | |
decoder.enter() | |
_, val = decoder.read() | |
decoder.leave() | |
return gtv_big_int(val) | |
elif tag_nr == GTVType.ARRAY.value: | |
decoder.enter() | |
array = [] | |
if decoder.peek(): | |
decoder.enter() # Enter sequence | |
while decoder.peek(): | |
array.append(_decode_typed_value(decoder)) | |
decoder.leave() | |
decoder.leave() | |
return gtv_array(array) | |
elif tag_nr == GTVType.DICT.value: | |
decoder.enter() | |
dict_pairs = {} | |
if decoder.peek(): | |
decoder.enter() # Enter sequence | |
while decoder.peek(): | |
decoder.enter() # Enter pair sequence | |
_, key = decoder.read() | |
key = key.decode('utf-8') if isinstance(key, bytes) else key | |
value = _decode_typed_value(decoder) | |
dict_pairs[key] = value | |
decoder.leave() | |
decoder.leave() | |
decoder.leave() | |
return gtv_dict(dict_pairs) | |
raise ValueError(f"Unexpected ASN.1 tag: {tag}") | |
def to_raw_value(gtv: GTV) -> RawGtv: | |
"""Convert typed GTV to raw Python value""" | |
if gtv.type == GTVType.NULL: | |
return None | |
elif gtv.type in (GTVType.BYTE_ARRAY, GTVType.STRING, GTVType.INTEGER, GTVType.BIG_INTEGER): | |
return gtv.value | |
elif gtv.type == GTVType.ARRAY: | |
return [to_raw_value(x) for x in gtv.value] | |
elif gtv.type == GTVType.DICT: | |
return {k: to_raw_value(v) for k, v in gtv.value} | |
else: | |
raise TypeError(f"Unknown GTV type: {gtv.type}") | |
def is_gtv_compatible(value: Any) -> bool: | |
"""Check if a value can be represented in GTV automatically""" | |
try: | |
gtv_auto(value) | |
return True | |
except TypeError: | |
return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
from typing import List, Any, Union, Dict | |
from gtv import GTV, encode_value, gtv_auto, is_gtv_compatible, GTVType | |
# Hash prefix constants | |
HASH_PREFIX_LEAF = 1 | |
HASH_PREFIX_NODE = 0 | |
HASH_PREFIX_NODE_ARRAY = 7 | |
HASH_PREFIX_NODE_DICT = 8 | |
def sha256(data: bytes) -> bytes: | |
"""Calculate SHA256 hash of data""" | |
return hashlib.sha256(data).digest() | |
class CryptoSystem: | |
"""Simple crypto system for hashing""" | |
def digest(self, buffer: bytes) -> bytes: | |
return sha256(buffer) | |
class BinaryTreeElement: | |
"""Base class for tree elements""" | |
pass | |
class Node(BinaryTreeElement): | |
"""Represents an internal node in the Merkle tree""" | |
def __init__(self, left: BinaryTreeElement, right: BinaryTreeElement): | |
self.left = left | |
self.right = right | |
class Leaf(BinaryTreeElement): | |
"""Represents a leaf node containing a GTV value""" | |
def __init__(self, content: GTV): | |
self.content = content | |
class EmptyLeaf(BinaryTreeElement): | |
"""Represents an empty leaf node""" | |
pass | |
class ContainerNode(Node): | |
"""Base class for container nodes""" | |
def __init__(self, left: BinaryTreeElement, right: BinaryTreeElement, content: Any, size: int): | |
super().__init__(left, right) | |
self.content = content | |
self.size = size | |
class ArrayHeadNode(ContainerNode): | |
"""Special node type for array containers""" | |
pass | |
class DictHeadNode(ContainerNode): | |
"""Special node type for dictionary containers""" | |
pass | |
class BinaryTreeFactory: | |
"""Factory for creating binary trees from GTV values""" | |
@staticmethod | |
def build_higher_layer(elements: List[BinaryTreeElement]) -> BinaryTreeElement: | |
"""Build the next layer of the tree from a list of elements""" | |
if not elements: | |
raise ValueError("Cannot work on empty arrays") | |
if len(elements) == 1: | |
return elements[0] | |
result = [] | |
i = 0 | |
while i < len(elements) - 1: | |
result.append(Node(elements[i], elements[i + 1])) | |
i += 2 | |
# Handle odd number of elements | |
if i < len(elements): | |
result.append(elements[i]) | |
return BinaryTreeFactory.build_higher_layer(result) | |
@staticmethod | |
def handle_array_container(items: List[Any]) -> BinaryTreeElement: | |
"""Build a tree from array items""" | |
if not items: | |
return ArrayHeadNode(EmptyLeaf(), EmptyLeaf(), [], 0) | |
# Build leaves recursively | |
leaves = [] | |
for item in items: | |
leaves.append(BinaryTreeFactory.build_tree(item)) | |
if len(leaves) == 1: | |
# Single item case - leaf becomes left, right is empty | |
return ArrayHeadNode(leaves[0], EmptyLeaf(), items, len(items)) | |
# Build tree structure for multiple items | |
tree_root = BinaryTreeFactory.build_higher_layer(leaves) | |
# If tree_root is a Node, use its left/right directly | |
if isinstance(tree_root, Node): | |
return ArrayHeadNode(tree_root.left, tree_root.right, items, len(items)) | |
# If tree_root is a Leaf, it becomes the left child | |
return ArrayHeadNode(tree_root, EmptyLeaf(), items, len(items)) | |
@staticmethod | |
def handle_dict_container(items: List[tuple[str, GTV]]) -> BinaryTreeElement: | |
"""Build a tree from dictionary items""" | |
if not items: | |
return DictHeadNode(EmptyLeaf(), EmptyLeaf(), [], 0) | |
# Build leaves recursively | |
leaves = [] | |
for key, value in items: | |
leaves.append(Leaf(gtv_auto(key))) | |
leaves.append(BinaryTreeFactory.build_tree(value)) | |
if len(leaves) == 2: # Single key-value pair | |
return DictHeadNode(leaves[0], leaves[1], items, len(items)) | |
# Build tree structure | |
tree_root = BinaryTreeFactory.build_higher_layer(leaves) | |
# If tree_root is a Node, use its left/right directly | |
if isinstance(tree_root, Node): | |
return DictHeadNode(tree_root.left, tree_root.right, items, len(items)) | |
# If tree_root is a Leaf, it becomes the left child | |
return DictHeadNode(tree_root, EmptyLeaf(), items, len(items)) | |
@staticmethod | |
def build_tree(value: Union[GTV, Any]) -> BinaryTreeElement: | |
"""Build tree elements from a GTV value""" | |
if not isinstance(value, GTV): | |
value = gtv_auto(value) | |
# Handle container types recursively | |
if value.type == GTVType.ARRAY: | |
return BinaryTreeFactory.handle_array_container(value.value) | |
elif value.type == GTVType.DICT: | |
return BinaryTreeFactory.handle_dict_container(value.value) | |
else: | |
return Leaf(value) | |
class MerkleHashCalculator: | |
"""Calculates Merkle tree hashes with support for nested structures""" | |
def __init__(self, crypto_system: CryptoSystem): | |
self.crypto_system = crypto_system | |
def calculate_node_hash(self, prefix: int, hash_left: bytes, hash_right: bytes) -> bytes: | |
"""Calculate hash of an internal node""" | |
buffer_sum = bytes([prefix]) + hash_left + hash_right | |
return self.crypto_system.digest(buffer_sum) | |
def calculate_leaf_hash(self, value: GTV) -> bytes: | |
"""Calculate hash of a leaf node""" | |
buffer_sum = bytes([HASH_PREFIX_LEAF]) + encode_value(value) | |
return self.crypto_system.digest(buffer_sum) | |
def calculate_merkle_hash(self, element: BinaryTreeElement) -> bytes: | |
"""Calculate Merkle hash recursively for any tree element""" | |
if isinstance(element, EmptyLeaf): | |
return bytes(32) | |
if isinstance(element, Leaf): | |
return self.calculate_leaf_hash(element.content) | |
# Get hash prefix based on node type | |
if isinstance(element, ArrayHeadNode): | |
prefix = HASH_PREFIX_NODE_ARRAY | |
elif isinstance(element, DictHeadNode): | |
prefix = HASH_PREFIX_NODE_DICT | |
else: | |
prefix = HASH_PREFIX_NODE | |
return self.calculate_node_hash( | |
prefix, | |
self.calculate_merkle_hash(element.left), | |
self.calculate_merkle_hash(element.right) | |
) | |
def gtv_hash(value: Union[GTV, Any]) -> bytes: | |
"""Calculate the GTV hash of a value""" | |
if not isinstance(value, GTV): | |
value = gtv_auto(value) | |
calculator = MerkleHashCalculator(CryptoSystem()) | |
tree = BinaryTreeFactory.build_tree(value) | |
return calculator.calculate_merkle_hash(tree) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
from typing import List, Any, Union | |
from gtv import GTV, encode_value, gtv_auto, GTVType | |
HASH_PREFIX_LEAF = 1 | |
HASH_PREFIX_NODE = 0 | |
HASH_PREFIX_NODE_ARRAY = 7 | |
HASH_PREFIX_NODE_DICT = 8 | |
def sha256(data: bytes) -> bytes: | |
return hashlib.sha256(data).digest() | |
def calculate_leaf_hash(value: GTV) -> bytes: | |
buffer_sum = bytes([HASH_PREFIX_LEAF]) + encode_value(value) | |
return sha256(buffer_sum) | |
def calculate_node_hash(prefix: int, hash_left: bytes, hash_right: bytes) -> bytes: | |
buffer_sum = bytes([prefix]) + hash_left + hash_right | |
return sha256(buffer_sum) | |
def get_empty_hash() -> bytes: | |
return bytes(32) | |
def hash_array_elements(elements: List[GTV]) -> bytes: | |
element_hashes = [gtv_hash_simple(elem) for elem in elements] | |
while len(element_hashes) > 2: # build tree until we reach the top node | |
new_hashes = [] | |
for i in range(0, len(element_hashes), 2): | |
if i + 1 < len(element_hashes): | |
new_hash = calculate_node_hash( | |
HASH_PREFIX_NODE, | |
element_hashes[i], | |
element_hashes[i + 1] | |
) | |
else: | |
new_hash = element_hashes[i] | |
new_hashes.append(new_hash) | |
element_hashes = new_hashes | |
return calculate_node_hash( | |
HASH_PREFIX_NODE_ARRAY, | |
element_hashes[0] if len(element_hashes) > 0 else get_empty_hash(), | |
get_empty_hash() if len(element_hashes) == 1 else element_hashes[1] | |
) | |
def hash_dict_elements(pairs: List[tuple[str, GTV]]) -> bytes: | |
# Create list of hashes for all keys and values | |
element_hashes = [] | |
for key, value in pairs: | |
element_hashes.append(calculate_leaf_hash(gtv_auto(key))) | |
element_hashes.append(gtv_hash_simple(value)) | |
while len(element_hashes) > 2: #build tree until we reach top node | |
new_hashes = [] | |
for i in range(0, len(element_hashes), 2): | |
if i + 1 < len(element_hashes): | |
new_hash = calculate_node_hash( | |
HASH_PREFIX_NODE, | |
element_hashes[i], | |
element_hashes[i + 1] | |
) | |
else: | |
new_hash = element_hashes[i] | |
new_hashes.append(new_hash) | |
element_hashes = new_hashes | |
return calculate_node_hash( | |
HASH_PREFIX_NODE_DICT, | |
element_hashes[0] if len(element_hashes) > 0 else get_empty_hash(), | |
get_empty_hash() if len(element_hashes) == 1 else element_hashes[1] | |
) | |
def gtv_hash_simple(value: Union[GTV, Any]) -> bytes: | |
"""Calculate GTV hash without building explicit tree structure""" | |
if not isinstance(value, GTV): | |
value = gtv_auto(value) | |
if value.type == GTVType.ARRAY: | |
return hash_array_elements(value.value) | |
elif value.type == GTVType.DICT: | |
return hash_dict_elements(value.value) | |
else: | |
return calculate_leaf_hash(value) | |
def test_gtv_hash_implementations() -> bool: | |
"""Test if simplified implementation matches the baseline""" | |
from gtv_hash import gtv_hash | |
test_cases = [ | |
None, | |
"test", | |
123, | |
[1, 2, 3], | |
{"a": 1, "b": 2}, | |
[{"x": [1,2]}, "test", 123], | |
{"complex": [1, {"a": None}, "test"]}, | |
bytes([1,2,3]) | |
] | |
for test_case in test_cases: | |
hash1 = gtv_hash(test_case) | |
hash2 = gtv_hash_simple(test_case) | |
if hash1 != hash2: | |
print(f"Hash mismatch for value: {test_case}") | |
print(f"Original: {hash1.hex()}") | |
print(f"Simple: {hash2.hex()}") | |
return False | |
print("All test cases passed!") | |
return True | |
if __name__ == "__main__": | |
test_gtv_hash_implementations() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment