Created
December 19, 2019 00:01
-
-
Save traverseda/316b41732aa0a30f8a45e50d2ddeb55d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A type-hinted messagepack-like serializer for immutable python types. | |
Could be extended to a generic type-hinted messagepack serializer without too much trouble. | |
If you need that, feel free to reach out. | |
https://github.com/msgpack/msgpack/blob/master/spec.md | |
""" | |
from typing import Union, Tuple, FrozenSet | |
baseImmutableTypes = Union[str,int,bool,float,bytes,slice,complex,None] #Ellipsis? | |
containerImmutableTypes = Union[FrozenSet['containerImmutableTypes'], | |
Tuple['containerImmutableTypes'], | |
baseImmutableTypes] | |
from functools import singledispatch | |
@singledispatch | |
def encode(obj) -> bytes: | |
raise NotImplemented("Can't encode object of type {}".format(type(obj))) | |
@encode.register | |
def encode_bool(obj: bool) -> bytes: | |
if obj: return b'\xc3' | |
else: return b'\xc2' | |
#160-219 | |
@encode.register | |
def encode_str(obj: str) -> bytes: | |
#Strings of various lenghts, accoring to the spec. | |
objBytes = obj.encode("utf-8") | |
if len(objBytes) < 31: #Fixstring | |
typeId = (160+len(objBytes)).to_bytes(1, byteorder='big') | |
return typeId+objBytes | |
elif len(objBytes) < 255: | |
return b'\xd9'+len(objBytes).to_bytes(1,byteorder='big')+objBytes | |
elif len(objBytes) < 65535: | |
return b'\xda'+len(objBytes).to_bytes(2,byteorder='big')+objBytes | |
elif len(objBytes) < 4294967295: | |
return b'\xdb'+len(objBytes).to_bytes(4,byteorder='big')+objBytes | |
else: | |
raise ValueError("Your string is too big to encode.\ | |
That means it's more than ~4.29GB!\ | |
That's way too big for this implementation on 2020 hardware.") | |
@decode.register(range(160,191)) | |
def decode_fixstr(data: bytes) -> str: | |
pass | |
@decode.register(range(217,219)) | |
def decode_str(data: bytes) -> str: | |
pass | |
@encode.register | |
def encode_none(obj: None) -> bytes: | |
return b'\xc0' | |
@encode.register | |
def encode_int(obj: int) -> bytes: | |
if 128 > obj > -1: #positive fixint | |
return obj.to_bytes(1,byteorder='big') | |
elif 0 > obj > -33: #negative fixint | |
return (256-obj).to_bytes(1,byteorder='big') | |
elif 256 > obj > -1: #uint8 | |
return b"\xcc"+obj.to_bytes(1,byteorder='big') | |
elif 65_537 > obj > -1: #uint16 | |
return b"\xcd"+obj.to_bytes(2,byteorder='big') | |
elif 4_294_967_295 > obj > -1: #uint32 | |
return b"\xce"+obj.to_bytes(4,byteorder='big') | |
elif 18_446_744_073_709_551_615 > obj > -1: #uint64 | |
return b"\xcf"+obj.to_bytes(8,byteorder='big') | |
elif 0 > obj > -129: #signed int8 | |
return b"\xd0"+obj.to_bytes(1,byteorder='big',signed=True) | |
elif -128 > obj > -32_769: #signed int16 | |
return b"\xd1"+obj.to_bytes(2,byteorder='big',signed=True) | |
elif -32_768 > obj > -2_147_483_649: #signed int32 | |
return b"\xd2"+obj.to_bytes(4,byteorder='big',signed=True) | |
elif -2_147_483_648 > obj > -9_223_372_036_854_775_808: #signed int64 | |
return b"\xd3"+obj.to_bytes(8,byteorder='big',signed=True) | |
raise ValueError("Integer is not encodable: "+str(obj)) | |
@decode.register(range(0,127)) | |
def decode_fixint(data: bytes) -> Tuple[bytes, int]: | |
return data[1:], data[0] | |
@decode.register(range(224,255)) | |
def decode_neg_fixint(data: bytes) -> Tuple[bytes, int]: | |
return data[1:], data[0]-256 | |
@decode.register(range(204,206)) | |
def decode_uint(data: bytes) -> Tuple[bytes, int]: | |
bytesLength = data[0]-203 | |
data = data[1:] | |
return data[bytesLength:], int.from_bytes(data[bytesLength:],byteorder="big") | |
@decode.register(range(207,211)) | |
def decode_int(data: bytes) -> Tuple[bytes, int]: | |
bytesLength = data[0]-207 | |
data = data[1:] | |
return data[bytesLength:], int.from_bytes(data[bytesLength:],byteorder="big",signed=True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment