Skip to content

Instantly share code, notes, and snippets.

@traverseda
Created December 19, 2019 00:01
Show Gist options
  • Save traverseda/316b41732aa0a30f8a45e50d2ddeb55d to your computer and use it in GitHub Desktop.
Save traverseda/316b41732aa0a30f8a45e50d2ddeb55d to your computer and use it in GitHub Desktop.
"""A type-hinted messagepack-like serializer for immutable python types.
Could be extended to a generic type-hinted messagepack serializer without too much trouble.
If you need that, feel free to reach out.
https://github.com/msgpack/msgpack/blob/master/spec.md
"""
from typing import Union, Tuple, FrozenSet
baseImmutableTypes = Union[str,int,bool,float,bytes,slice,complex,None] #Ellipsis?
containerImmutableTypes = Union[FrozenSet['containerImmutableTypes'],
Tuple['containerImmutableTypes'],
baseImmutableTypes]
from functools import singledispatch
@singledispatch
def encode(obj) -> bytes:
raise NotImplemented("Can't encode object of type {}".format(type(obj)))
@encode.register
def encode_bool(obj: bool) -> bytes:
if obj: return b'\xc3'
else: return b'\xc2'
#160-219
@encode.register
def encode_str(obj: str) -> bytes:
#Strings of various lenghts, accoring to the spec.
objBytes = obj.encode("utf-8")
if len(objBytes) < 31: #Fixstring
typeId = (160+len(objBytes)).to_bytes(1, byteorder='big')
return typeId+objBytes
elif len(objBytes) < 255:
return b'\xd9'+len(objBytes).to_bytes(1,byteorder='big')+objBytes
elif len(objBytes) < 65535:
return b'\xda'+len(objBytes).to_bytes(2,byteorder='big')+objBytes
elif len(objBytes) < 4294967295:
return b'\xdb'+len(objBytes).to_bytes(4,byteorder='big')+objBytes
else:
raise ValueError("Your string is too big to encode.\
That means it's more than ~4.29GB!\
That's way too big for this implementation on 2020 hardware.")
@decode.register(range(160,191))
def decode_fixstr(data: bytes) -> str:
pass
@decode.register(range(217,219))
def decode_str(data: bytes) -> str:
pass
@encode.register
def encode_none(obj: None) -> bytes:
return b'\xc0'
@encode.register
def encode_int(obj: int) -> bytes:
if 128 > obj > -1: #positive fixint
return obj.to_bytes(1,byteorder='big')
elif 0 > obj > -33: #negative fixint
return (256-obj).to_bytes(1,byteorder='big')
elif 256 > obj > -1: #uint8
return b"\xcc"+obj.to_bytes(1,byteorder='big')
elif 65_537 > obj > -1: #uint16
return b"\xcd"+obj.to_bytes(2,byteorder='big')
elif 4_294_967_295 > obj > -1: #uint32
return b"\xce"+obj.to_bytes(4,byteorder='big')
elif 18_446_744_073_709_551_615 > obj > -1: #uint64
return b"\xcf"+obj.to_bytes(8,byteorder='big')
elif 0 > obj > -129: #signed int8
return b"\xd0"+obj.to_bytes(1,byteorder='big',signed=True)
elif -128 > obj > -32_769: #signed int16
return b"\xd1"+obj.to_bytes(2,byteorder='big',signed=True)
elif -32_768 > obj > -2_147_483_649: #signed int32
return b"\xd2"+obj.to_bytes(4,byteorder='big',signed=True)
elif -2_147_483_648 > obj > -9_223_372_036_854_775_808: #signed int64
return b"\xd3"+obj.to_bytes(8,byteorder='big',signed=True)
raise ValueError("Integer is not encodable: "+str(obj))
@decode.register(range(0,127))
def decode_fixint(data: bytes) -> Tuple[bytes, int]:
return data[1:], data[0]
@decode.register(range(224,255))
def decode_neg_fixint(data: bytes) -> Tuple[bytes, int]:
return data[1:], data[0]-256
@decode.register(range(204,206))
def decode_uint(data: bytes) -> Tuple[bytes, int]:
bytesLength = data[0]-203
data = data[1:]
return data[bytesLength:], int.from_bytes(data[bytesLength:],byteorder="big")
@decode.register(range(207,211))
def decode_int(data: bytes) -> Tuple[bytes, int]:
bytesLength = data[0]-207
data = data[1:]
return data[bytesLength:], int.from_bytes(data[bytesLength:],byteorder="big",signed=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment