Last active
September 24, 2024 07:23
-
-
Save t-wy/778123fedd15513e4626ad27f07cb690 to your computer and use it in GitHub Desktop.
MessagePack-CSharp unpacker for Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def messagepack_csharp_unpack(data: bytes) -> list: | |
""" | |
Redistribution Notice: | |
Properly attribute all entities listed below and request others to follow the same. | |
Otherwise, DO NOT remove or modify this comment. | |
Specification (MessagePack for C#): | |
https://github.com/MessagePack-CSharp/MessagePack-CSharp | |
Dependencies: | |
msgpack: https://github.com/msgpack/msgpack-python | |
lz4: https://github.com/python-lz4/python-lz4 | |
Implementation: | |
@t-wy: https://github.com/t-wy | |
""" | |
from msgpack import unpackb, Unpacker | |
def LZ4_decompress(size: int, src: bytes) -> bytes: | |
from lz4.block import decompress | |
return decompress(src, uncompressed_size=size) | |
def ext_hook(code, data): | |
if code == 99: | |
unpacker = Unpacker(None, max_buffer_size=0, strict_map_key=False) # integer may be used as key | |
unpacker.feed(data) | |
return unpackb(LZ4_decompress(unpacker.unpack(), data[unpacker.tell():]), strict_map_key=False) # make sure to call unpack before tell | |
elif code == 98: # list of integers specifying lengths of each part | |
unpacker = Unpacker(None, max_buffer_size=0) | |
unpacker.feed(data) | |
return tuple(unpacker) | |
raise ValueError | |
def check_98(lst): | |
if len(lst) > 0 and type(lst[0]) is tuple: | |
return unpackb(b"".join(LZ4_decompress(size, part) for size, part in zip(lst[0], lst[1:])), strict_map_key=False) | |
return lst | |
unpacker = Unpacker(None, ext_hook=ext_hook, list_hook=check_98, max_buffer_size=0, strict_map_key=False) | |
unpacker.feed(data) | |
return list(unpacker) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# an example implementation of treating the msgpack object as a dataclass | |
from typing import List, Union, get_origin, get_args | |
from dataclasses import dataclass, is_dataclass | |
from enum import Enum | |
def conv_dataclass(target): | |
for field, cls in target.__annotations__.items(): | |
if cls is None: | |
continue | |
value = getattr(target, field) | |
if value is None: | |
continue | |
is_list = False | |
if get_origin(cls) is list: | |
cls = get_args(cls)[0] | |
is_list = True | |
if get_origin(cls) is Union and type(None) in get_args(cls) and len(get_args(cls)) == 2: | |
cls = [t for t in get_args(cls) if t is not type(None)][0] | |
if isinstance(cls, type): | |
action = None | |
if issubclass(cls, Enum): # or cls is DateTime: # handle other data types | |
action = lambda x: cls(x) | |
elif is_dataclass(cls): | |
action = lambda x: conv_dataclass(cls(*x)) | |
if action is not None: | |
if is_list: | |
action = (lambda func: lambda x: [func(y) for y in x])(action) | |
object.__setattr__(target, field, action(value)) | |
return target | |
def messagepack_object(cls): | |
def __post_init__(self): | |
conv_dataclass(self) | |
cls.__post_init__ = __post_init__ | |
return dataclass(cls) | |
@dataclass | |
class InnerClass: | |
f1: int | |
f2: int | |
@messagepack_object | |
class OuterClass: | |
a: int | |
b: list | |
c: List[InnerClass] | |
d: str | |
obj = [1, [2, 3], [[4, 5], [6, 7]], "8"] | |
print(OuterClass(*obj)) | |
# output | |
# OuterClass(a=1, b=[2, 3], c=[InnerClass(f1=4, f2=5), InnerClass(f1=6, f2=7)], d='8') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# an example implementation of adding keys back for indexed key | |
unnamed_dict = lambda x: {index: xt for index, xt in enumerate(x)} | |
list_of = lambda _type: lambda x: [call_or_convert(_type, xt) for xt in x] | |
nullable = lambda _type: lambda x: None if x is None else call_or_convert(_type, x) | |
identity = lambda x: x | |
def call_or_convert(struct, value): | |
return struct(value) if callable(struct) else add_keys(struct, value) | |
def add_keys(struct_dict, result): | |
return {key: call_or_convert(struct_dict[key], value) for key, value in zip(struct_dict, result) if struct_dict[key] is not ...} # for gaps, use ... | |
obj = [1, [2, 3], [[4, 5], [6, 7]], "8"] | |
print(add_keys({ | |
"a": int, | |
"b": list, | |
"c": list_of({ | |
"f1": int, | |
"f2": int | |
}), | |
"d": str | |
}, obj)) | |
# output | |
# {'a': 1, 'b': [2, 3], 'c': [{'f1': 4, 'f2': 5}, {'f1': 6, 'f2': 7}], 'd': '8'} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def LZ4_decompress(size: int, src: bytes) -> bytes: | |
# Polyfill if python-lz4 is not available, implemented by @t-wy | |
offset = 0 | |
result = b"" | |
while offset < len(src): | |
token = src[offset] | |
offset += 1 | |
# get copy part | |
run_length = token >> 4 | |
if run_length == 15: | |
while True: | |
length = src[offset] | |
offset += 1 | |
if length == 255: | |
run_length += 255 | |
else: | |
run_length += length | |
break | |
result += src[offset:offset + run_length] | |
offset += run_length | |
if offset >= len(src): | |
break | |
# get repeated part | |
# get offset | |
dest_cursor = len(result) | |
dest_cursor -= (src[offset + 1] << 8) | src[offset] | |
offset += 2 | |
# get matchlength | |
run_length = token & 15 | |
if run_length == 15: | |
while True: | |
length = src[offset] | |
offset += 1 | |
if length == 255: | |
run_length += 255 | |
else: | |
run_length += length | |
break | |
if dest_cursor + run_length + 4 <= len(result): | |
# simple copy | |
temp = result[dest_cursor:dest_cursor + run_length + 4] | |
else: | |
temp = (result[dest_cursor:] * ((run_length + 4) // (len(result) - dest_cursor) + 1))[:run_length + 4] | |
result += temp | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updates:
msgpack.fallback.Unpacker
bymsgpack.Unpacker
after being left unnoticed for a long time ><Now it should be using the fast cpython class by default.