Skip to content

Instantly share code, notes, and snippets.

@t-wy
Last active September 24, 2024 07:23
Show Gist options
  • Save t-wy/778123fedd15513e4626ad27f07cb690 to your computer and use it in GitHub Desktop.
Save t-wy/778123fedd15513e4626ad27f07cb690 to your computer and use it in GitHub Desktop.
MessagePack-CSharp unpacker for Python
def messagepack_csharp_unpack(data: bytes) -> list:
"""
Redistribution Notice:
Properly attribute all entities listed below and request others to follow the same.
Otherwise, DO NOT remove or modify this comment.
Specification (MessagePack for C#):
https://github.com/MessagePack-CSharp/MessagePack-CSharp
Dependencies:
msgpack: https://github.com/msgpack/msgpack-python
lz4: https://github.com/python-lz4/python-lz4
Implementation:
@t-wy: https://github.com/t-wy
"""
from msgpack import unpackb, Unpacker
def LZ4_decompress(size: int, src: bytes) -> bytes:
from lz4.block import decompress
return decompress(src, uncompressed_size=size)
def ext_hook(code, data):
if code == 99:
unpacker = Unpacker(None, max_buffer_size=0, strict_map_key=False) # integer may be used as key
unpacker.feed(data)
return unpackb(LZ4_decompress(unpacker.unpack(), data[unpacker.tell():]), strict_map_key=False) # make sure to call unpack before tell
elif code == 98: # list of integers specifying lengths of each part
unpacker = Unpacker(None, max_buffer_size=0)
unpacker.feed(data)
return tuple(unpacker)
raise ValueError
def check_98(lst):
if len(lst) > 0 and type(lst[0]) is tuple:
return unpackb(b"".join(LZ4_decompress(size, part) for size, part in zip(lst[0], lst[1:])), strict_map_key=False)
return lst
unpacker = Unpacker(None, ext_hook=ext_hook, list_hook=check_98, max_buffer_size=0, strict_map_key=False)
unpacker.feed(data)
return list(unpacker)
# an example implementation of treating the msgpack object as a dataclass
from typing import List, Union, get_origin, get_args
from dataclasses import dataclass, is_dataclass
from enum import Enum
def conv_dataclass(target):
for field, cls in target.__annotations__.items():
if cls is None:
continue
value = getattr(target, field)
if value is None:
continue
is_list = False
if get_origin(cls) is list:
cls = get_args(cls)[0]
is_list = True
if get_origin(cls) is Union and type(None) in get_args(cls) and len(get_args(cls)) == 2:
cls = [t for t in get_args(cls) if t is not type(None)][0]
if isinstance(cls, type):
action = None
if issubclass(cls, Enum): # or cls is DateTime: # handle other data types
action = lambda x: cls(x)
elif is_dataclass(cls):
action = lambda x: conv_dataclass(cls(*x))
if action is not None:
if is_list:
action = (lambda func: lambda x: [func(y) for y in x])(action)
object.__setattr__(target, field, action(value))
return target
def messagepack_object(cls):
def __post_init__(self):
conv_dataclass(self)
cls.__post_init__ = __post_init__
return dataclass(cls)
@dataclass
class InnerClass:
f1: int
f2: int
@messagepack_object
class OuterClass:
a: int
b: list
c: List[InnerClass]
d: str
obj = [1, [2, 3], [[4, 5], [6, 7]], "8"]
print(OuterClass(*obj))
# output
# OuterClass(a=1, b=[2, 3], c=[InnerClass(f1=4, f2=5), InnerClass(f1=6, f2=7)], d='8')
# an example implementation of adding keys back for indexed key
unnamed_dict = lambda x: {index: xt for index, xt in enumerate(x)}
list_of = lambda _type: lambda x: [call_or_convert(_type, xt) for xt in x]
nullable = lambda _type: lambda x: None if x is None else call_or_convert(_type, x)
identity = lambda x: x
def call_or_convert(struct, value):
return struct(value) if callable(struct) else add_keys(struct, value)
def add_keys(struct_dict, result):
return {key: call_or_convert(struct_dict[key], value) for key, value in zip(struct_dict, result) if struct_dict[key] is not ...} # for gaps, use ...
obj = [1, [2, 3], [[4, 5], [6, 7]], "8"]
print(add_keys({
"a": int,
"b": list,
"c": list_of({
"f1": int,
"f2": int
}),
"d": str
}, obj))
# output
# {'a': 1, 'b': [2, 3], 'c': [{'f1': 4, 'f2': 5}, {'f1': 6, 'f2': 7}], 'd': '8'}
def LZ4_decompress(size: int, src: bytes) -> bytes:
# Polyfill if python-lz4 is not available, implemented by @t-wy
offset = 0
result = b""
while offset < len(src):
token = src[offset]
offset += 1
# get copy part
run_length = token >> 4
if run_length == 15:
while True:
length = src[offset]
offset += 1
if length == 255:
run_length += 255
else:
run_length += length
break
result += src[offset:offset + run_length]
offset += run_length
if offset >= len(src):
break
# get repeated part
# get offset
dest_cursor = len(result)
dest_cursor -= (src[offset + 1] << 8) | src[offset]
offset += 2
# get matchlength
run_length = token & 15
if run_length == 15:
while True:
length = src[offset]
offset += 1
if length == 255:
run_length += 255
else:
run_length += length
break
if dest_cursor + run_length + 4 <= len(result):
# simple copy
temp = result[dest_cursor:dest_cursor + run_length + 4]
else:
temp = (result[dest_cursor:] * ((run_length + 4) // (len(result) - dest_cursor) + 1))[:run_length + 4]
result += temp
return result
@t-wy
Copy link
Author

t-wy commented Aug 27, 2024

Updates:

  • Replaced the fallback msgpack.fallback.Unpacker by msgpack.Unpacker after being left unnoticed for a long time ><
    Now it should be using the fast cpython class by default.
  • Made use of provided hooks.
  • Fixed LZ4 uncompressed size as it's known.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment