Skip to content

Instantly share code, notes, and snippets.

@pipermerriam
Last active February 27, 2019 14:30
Show Gist options
  • Save pipermerriam/ced4ed8f1fbab732120986f57db5069a to your computer and use it in GitHub Desktop.
Save pipermerriam/ced4ed8f1fbab732120986f57db5069a to your computer and use it in GitHub Desktop.

Simple Streamable Serialize

Motivation:

  • Streaming encoding and decoding
  • Compact
  • 1st class support for needed data types
    • scalars (block numbers)
    • fixed length byte-strings (block hashes, state roots)
    • fixed width integers (v, r, s from transaction signature)
    • dynamic length byte strings (log data)

This scheme is intended to:

  • be more compact than RLP.
  • support streaming encoding and decoding
  • highly compact

It explicitely does not do:

  • ability to naively decode (requires prior knowledge of schema)
  • fast access to internal elements (you have to decode it first)

Meta functions for encoding and decoding

  • encode(t, v) -> binary
    • t is a type
    • v is a value of type t
    • result is raw binary
  • `decode(t, b) -> v
    • t is a type
    • b is a a binary stream
    • v is a value of type t

Unsigned Scalar: scalar and scalar<N>

  • If N specified:
    • N % 8 == 0
    • 8 <= N <= 256
  • Otherwise: N = 256
  • LEB128 encoded
  • Maximum of ceil(N / 7) bytes.

Unsigned Integer: uint<N>

  • N % 8 == 0
  • 8 <= N <= 256
  • fixed-width little endian encoding

FixedBytes: bytes<N>

fixed width byte string

Bytes: bytes

dynamic length byte string

  • Length prefixed encoding
    • length: scalar32
    • payload: encode('scalar32', len(v)) + v

Tuple: tuple(t0, t1, ..., tN)

fixed length heterogenous iterable

  • values encoded sequentially
    • encode(v[0], t0) + encode(v[1], t1) + ... + encode(v[N], tN)
  • recursive

Array: <type>[]

dynamic length homogenous iterable

  • Length prefixed encoding
    • length scalar32 (length is the number of elements in the array)
    • payload encode('scalar32', len(v)) + encode(v[0]) + encode(v[1]) + ...
  • recursive """
import functools
import itertools
import math
import operator
from typing import (
Any,
IO,
Iterable,
Callable,
Tuple,
TypeVar,
)
from cytoolz import curry
class ParseError(Exception):
pass
T = TypeVar('T')
def to_tuple(fn: Callable[..., Iterable[T]]) -> Callable[..., Tuple[T, ...]]:
@functools.wraps(fn)
def inner(*args, **kwargs) -> Tuple[T, ...]: # type: ignore
return tuple(fn(*args, **kwargs))
return inner
LOW_MASK = 2**7 - 1
HIGH_MASK = 2**7
def _validate_bit_size(bit_size: int) -> None:
assert bit_size % 8 == 8
assert 0 <= bit_size <= 256
@to_tuple
def _parse_unsigned_leb128(bit_size: int, stream: IO[bytes]) -> Iterable[int]:
max_shift = int(math.ceil(bit_size / 7))
for shift in itertools.count(0, 7):
if shift > max_shift:
raise ParseError("Parsed integer exceeds maximum bit size")
byte = stream.read(1)
try:
value = byte[0]
except IndexError:
raise ParseError(
"Unexpected end of stream while parsing LEB128 encoded integer"
)
yield (value & LOW_MASK) << shift
if not value & HIGH_MASK:
break
def _read_exact(num_bytes: int, stream: IO[bytes]) -> bytes:
data = stream.read(num_bytes)
if len(data) != num_bytes:
raise ParseError(f"Insufficient bytes in stream: needed {num_bytes}, got {len(data)}")
#
# Scalars
#
@curry
def parse_scalar(bit_size: int, stream: IO[bytes]) -> int:
"""
https://en.wikipedia.org/wiki/LEB128
"""
_validate_bit_size(bit_size)
return functools.reduce(
operator.or_,
_parse_unsigned_leb128(bit_size, stream),
0,
)
@curry
def serialize_scalar(bit_size: int, value: int) -> bytes:
_validate_bit_size(bit_size)
# TODO
pass
#
# Unsigned Integer
#
@curry
def parse_uint(bit_size: int, stream: IO[bytes]) -> int:
_validate_bit_size(bit_size)
data = _read_exact(bit_size // 8)
return int.from_bytes(data, 'litte')
@curry
def serialize_uint(bit_size: int, value: int) -> bytes:
_validate_bit_size(bit_size)
return value.to_bytes(bit_size // 8, 'little')
#
# Fixed Bytes
#
@curry
def parse_fixed_bytes(num_bytes: int, stream: IO[bytes]) -> bytes:
return _read_exact(num_bytes, stream)
@curry
def serialize_fixed_bytes(num_bytes: int, value: bytes) -> bytes:
assert len(value) == num_bytes
return value
#
# Dynamic Bytes
#
@curry
def parse_bytes(stream: IO[bytes]) -> bytes:
length = parse_scalar(32, stream)
return parse_fixed_bytes(length, stream)
@curry
def serialize_bytes(value: bytes) -> bytes:
return serialize_scalar(32, len(value)) + value
ParseFn = Callable[[IO[bytes]], Any]
#
# Array
#
@curry
def parse_array(item_parser: ParseFn, stream: IO[bytes]) -> Tuple[Any, ...]:
length = parse_scalar(32, stream)
return _parse_array(length, item_parser, stream)
@to_tuple
def _parse_array(length: int, item_parser: ParseFn, stream: IO[bytes]) -> Iterable[Any]:
for _ in range(length):
yield item_parser(stream)
SerializeFn = Callable[[Any], bytes]
@curry
def serialize_array(item_serializer: SerializeFn, values: Tuple[Any, ...]) -> bytes:
return serialize_scalar(32, len(values)) + b''.join((
item_serializer(item)
for item in values
))
#
# Tuple
#
@curry
def parse_tuple(element_parsers: Tuple[ParseFn, ...], stream: IO[bytes]) -> Tuple[Any, ...]:
return _parse_tuple(element_parsers, stream)
@to_tuple
def _parse_tuple(element_parsers: Tuple[ParseFn, ...], stream: IO[bytes]) -> Tuple[Any, ...]:
for parser in element_parsers:
yield parser(stream)
@curry
def serialize_tuple(element_serializers: SerializeFn, values: Tuple[Any, ...]) -> bytes:
assert len(element_serializers) == len(values)
return b''.join((
element_serializer(element)
for (element_serializer, element)
in zip(element_serializers, values)
))
#########
#
# API
#
#########
def encode(type_: str, value: Any) -> bytes:
pass
def decode(type_: str, raw: bytes) -> Any:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment