Skip to content

Instantly share code, notes, and snippets.

@traverseda
Last active September 21, 2018 13:16
Show Gist options
  • Save traverseda/61dedd1a5f1ac3a8f9a825a0ecf300fc to your computer and use it in GitHub Desktop.
Save traverseda/61dedd1a5f1ac3a8f9a825a0ecf300fc to your computer and use it in GitHub Desktop.
"""A binary serializer inspired by rypc's brine.
"""
#from __future__ import annotations
import typing, collections
from io import BytesIO, BufferedReader
from typing import Union, Tuple, FrozenSet, Generator
from functools import singledispatch
singleTons = Union[bool,None]
immutableTypes = Union[singleTons,int,float,complex,str,bytes,slice,Tuple["immutableTypes", ...],FrozenSet["immutableTypes"]]
supportedTypes = (
str,
int,
bytes,
True,
False,
None,
tuple,
"end_tuple",
frozenset,
"end_frozenset",
)
idsForTypes = {t: idx.to_bytes(1,"big") for idx,t in enumerate(supportedTypes)} #type: dict
typesForIds = {v:k for k,v in idsForTypes.items()}
@singledispatch
def encode(obj:immutableTypes) -> Generator[bytes,None,None]:
raise NotImplementedError
@encode.register(str)
def encode_str(text:str) -> Generator[bytes,None,None]:
yield idsForTypes[str]
encodedText = text.encode("utf-8")
length = encode_int(len(encodedText),include_hint=False)
yield from length
yield encodedText
@encode.register(bytes)
def encode_bytes(b:bytes) -> Generator[bytes,None,None]:
yield idsForTypes[bytes]
length = encode_int(len(b),include_hint=False)
yield from length
yield b
#https://retracile.net/blog/2012/01/20/20.30
@encode.register(int)
def encode_int(num:int, include_hint=True) -> Generator[bytes,None,None]:
if include_hint:
yield idsForTypes[int]
rval = [num%128]
while num >= 128:
num = (num//128)-1
last = (num%128)+128
rval.insert(0,last)
yield b"".join([i.to_bytes(1,"big") for i in rval])
@encode.register(bool)
def encode_bool(b:bool) -> Generator[bytes,None,None]:
yield idsForTypes[b]
@encode.register(type(None))
def encode_None(b:None) -> Generator[bytes,None,None]:
yield idsForTypes[b]
@encode.register(tuple)
def encode_tuple(t:tuple) -> Generator[bytes,None,None]:
yield idsForTypes[tuple]
for item in t:
yield from encode(item)
yield idsForTypes["end_tuple"]
@encode.register(frozenset)
def encode_frozenset(t:frozenset) -> Generator[bytes,None,None]:
yield idsForTypes[frozenset]
for item in t:
yield from encode(item)
yield idsForTypes["end_frozenset"]
#########Decode
def decode_str(stream:BufferedReader) -> str:
string_length = decode_int(stream)
newString = stream.read(string_length)
return newString.decode("utf-8")
def decode_bytes(stream:BufferedReader) -> bytes:
b_length = decode_int(stream)
b = stream.read(b_length)
return b
def decode_int(stream:BufferedReader) -> int:
rval = 0
while True:
byte = stream.readline(1)
num = int.from_bytes(byte,"big")
rval = (rval * 128) + num
if not num & 128:
return (rval)
rval = rval - 128 + 1
def decode_tuple(stream:BufferedReader) -> tuple:
def decode_tuple_inner(stream:BufferedReader) -> Generator[immutableTypes,None,None]:
while True:
currentTypeId = stream.peek(1)
currentTypeId = int.to_bytes(currentTypeId[0],1,"big")
if currentTypeId == idsForTypes["end_tuple"]:
stream.read(1)
break
yield from decode(stream)
return tuple(decode_tuple_inner(stream))
def decode_frozenset(stream:BufferedReader) -> frozenset:
def decode_frozenset_inner(stream:BufferedReader) -> Generator[immutableTypes,None,None]:
while True:
currentTypeId = stream.peek(1)
currentTypeId = int.to_bytes(currentTypeId[0],1,"big")
if currentTypeId == idsForTypes["end_frozenset"]:
stream.read(1)
break
yield from decode(stream)
return frozenset(decode_frozenset_inner(stream))
#These could be lambda functions, but the type checker complains.
def decode_True(stream):
return True
def decode_False(stream):
return False
def decode_None(stream):
return None
decode_functions = {
str: decode_str,
int: decode_int,
bytes: decode_bytes,
tuple: decode_tuple,
frozenset: decode_frozenset,
True: decode_True,
False: decode_False,
None: decode_None,
}
@singledispatch
def decode(stream:BufferedReader) -> Generator[immutableTypes,None,None]:
streamType = typesForIds[stream.read(1)]
if streamType in decode_functions:
yield decode_functions[streamType](stream)
@decode.register(bytes)
def decode_from_bytes(b:bytes) -> Generator[immutableTypes,None,None]:
yield from decode(BufferedReader(BytesIO(b)))
#encoded = b"".join(encode(("nar","bar",1,2,(True,False,None),frozenset({"bin","ban",b"bar"}))))
#print(encoded)
#print(*decode(encoded))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment