Last active
September 21, 2018 13:16
-
-
Save traverseda/61dedd1a5f1ac3a8f9a825a0ecf300fc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A binary serializer inspired by rypc's brine. | |
""" | |
#from __future__ import annotations | |
import typing, collections | |
from io import BytesIO, BufferedReader | |
from typing import Union, Tuple, FrozenSet, Generator | |
from functools import singledispatch | |
singleTons = Union[bool,None] | |
immutableTypes = Union[singleTons,int,float,complex,str,bytes,slice,Tuple["immutableTypes", ...],FrozenSet["immutableTypes"]] | |
supportedTypes = ( | |
str, | |
int, | |
bytes, | |
True, | |
False, | |
None, | |
tuple, | |
"end_tuple", | |
frozenset, | |
"end_frozenset", | |
) | |
idsForTypes = {t: idx.to_bytes(1,"big") for idx,t in enumerate(supportedTypes)} #type: dict | |
typesForIds = {v:k for k,v in idsForTypes.items()} | |
@singledispatch | |
def encode(obj:immutableTypes) -> Generator[bytes,None,None]: | |
raise NotImplementedError | |
@encode.register(str) | |
def encode_str(text:str) -> Generator[bytes,None,None]: | |
yield idsForTypes[str] | |
encodedText = text.encode("utf-8") | |
length = encode_int(len(encodedText),include_hint=False) | |
yield from length | |
yield encodedText | |
@encode.register(bytes) | |
def encode_bytes(b:bytes) -> Generator[bytes,None,None]: | |
yield idsForTypes[bytes] | |
length = encode_int(len(b),include_hint=False) | |
yield from length | |
yield b | |
#https://retracile.net/blog/2012/01/20/20.30 | |
@encode.register(int) | |
def encode_int(num:int, include_hint=True) -> Generator[bytes,None,None]: | |
if include_hint: | |
yield idsForTypes[int] | |
rval = [num%128] | |
while num >= 128: | |
num = (num//128)-1 | |
last = (num%128)+128 | |
rval.insert(0,last) | |
yield b"".join([i.to_bytes(1,"big") for i in rval]) | |
@encode.register(bool) | |
def encode_bool(b:bool) -> Generator[bytes,None,None]: | |
yield idsForTypes[b] | |
@encode.register(type(None)) | |
def encode_None(b:None) -> Generator[bytes,None,None]: | |
yield idsForTypes[b] | |
@encode.register(tuple) | |
def encode_tuple(t:tuple) -> Generator[bytes,None,None]: | |
yield idsForTypes[tuple] | |
for item in t: | |
yield from encode(item) | |
yield idsForTypes["end_tuple"] | |
@encode.register(frozenset) | |
def encode_frozenset(t:frozenset) -> Generator[bytes,None,None]: | |
yield idsForTypes[frozenset] | |
for item in t: | |
yield from encode(item) | |
yield idsForTypes["end_frozenset"] | |
#########Decode | |
def decode_str(stream:BufferedReader) -> str: | |
string_length = decode_int(stream) | |
newString = stream.read(string_length) | |
return newString.decode("utf-8") | |
def decode_bytes(stream:BufferedReader) -> bytes: | |
b_length = decode_int(stream) | |
b = stream.read(b_length) | |
return b | |
def decode_int(stream:BufferedReader) -> int: | |
rval = 0 | |
while True: | |
byte = stream.readline(1) | |
num = int.from_bytes(byte,"big") | |
rval = (rval * 128) + num | |
if not num & 128: | |
return (rval) | |
rval = rval - 128 + 1 | |
def decode_tuple(stream:BufferedReader) -> tuple: | |
def decode_tuple_inner(stream:BufferedReader) -> Generator[immutableTypes,None,None]: | |
while True: | |
currentTypeId = stream.peek(1) | |
currentTypeId = int.to_bytes(currentTypeId[0],1,"big") | |
if currentTypeId == idsForTypes["end_tuple"]: | |
stream.read(1) | |
break | |
yield from decode(stream) | |
return tuple(decode_tuple_inner(stream)) | |
def decode_frozenset(stream:BufferedReader) -> frozenset: | |
def decode_frozenset_inner(stream:BufferedReader) -> Generator[immutableTypes,None,None]: | |
while True: | |
currentTypeId = stream.peek(1) | |
currentTypeId = int.to_bytes(currentTypeId[0],1,"big") | |
if currentTypeId == idsForTypes["end_frozenset"]: | |
stream.read(1) | |
break | |
yield from decode(stream) | |
return frozenset(decode_frozenset_inner(stream)) | |
#These could be lambda functions, but the type checker complains. | |
def decode_True(stream): | |
return True | |
def decode_False(stream): | |
return False | |
def decode_None(stream): | |
return None | |
decode_functions = { | |
str: decode_str, | |
int: decode_int, | |
bytes: decode_bytes, | |
tuple: decode_tuple, | |
frozenset: decode_frozenset, | |
True: decode_True, | |
False: decode_False, | |
None: decode_None, | |
} | |
@singledispatch | |
def decode(stream:BufferedReader) -> Generator[immutableTypes,None,None]: | |
streamType = typesForIds[stream.read(1)] | |
if streamType in decode_functions: | |
yield decode_functions[streamType](stream) | |
@decode.register(bytes) | |
def decode_from_bytes(b:bytes) -> Generator[immutableTypes,None,None]: | |
yield from decode(BufferedReader(BytesIO(b))) | |
#encoded = b"".join(encode(("nar","bar",1,2,(True,False,None),frozenset({"bin","ban",b"bar"})))) | |
#print(encoded) | |
#print(*decode(encoded)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment