Skip to content

Instantly share code, notes, and snippets.

@kyamagu
Created June 20, 2019 06:56
Show Gist options
  • Save kyamagu/18bfed1d5dbd4d419189adfb002a82f3 to your computer and use it in GitHub Desktop.
Save kyamagu/18bfed1d5dbd4d419189adfb002a82f3 to your computer and use it in GitHub Desktop.
Serializable primitive and custom python objects
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Serializable objects\n",
"\n",
"This notebook demonstrates serializable objects that can `read` and `write` in a binary format.\n",
"The model is compatible with Python 2 and 3.\n",
"\n",
"For Python 3.7+, dataclasses might fit in the needs here."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Utils\n",
"\n",
"Helper functions for binary IO."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import io\n",
"import struct\n",
"\n",
"def read_bytes(fp, size):\n",
" data = fp.read(size)\n",
" assert size == len(data), 'read=%d, expected=%d' % (len(data), size)\n",
" return data\n",
"\n",
"\n",
"def write_bytes(fp, data):\n",
" offset = fp.tell()\n",
" fp.write(data)\n",
" size = fp.tell() - offset\n",
" assert size == len(data), 'written=%d, expected=%d' % (size, len(data))\n",
" return size\n",
" \n",
" \n",
"def read_fmt(fp, fmt):\n",
" if isinstance(fmt, struct.Struct):\n",
" return fmt.unpack(read_bytes(fp, fmt.size))\n",
" else:\n",
" buffer = read_bytes(fp, struct.calcsize(fmt))\n",
" return struct.unpack(fmt, buffer)\n",
"\n",
"\n",
"def write_fmt(fp, fmt, *args):\n",
" if isinstance(fmt, struct.Struct):\n",
" return write_bytes(fp, fmt.pack(*args))\n",
" else:\n",
" return write_bytes(fp, struct.pack(fmt, *args))\n",
"\n",
"\n",
"def read_padding(fp, length, divisor=1):\n",
" remainder = length % divisor\n",
" if remainder:\n",
" return read_bytes(fp, divisor - remainder)\n",
" return b''\n",
"\n",
"\n",
"def write_padding(fp, length, divisor=2):\n",
" remainder = length % divisor\n",
" if remainder:\n",
" return write_bytes(fp, b'\\x00' * (divisor - remainder))\n",
" return 0\n",
"\n",
"\n",
"def is_readable(fp, size=1):\n",
" read_size = len(fp.read(size))\n",
" fp.seek(-read_size, 1)\n",
" return read_size == size"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Serializable primitives"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"class Serializable(object):\n",
" \"\"\"\n",
" Base serialization interface.\n",
" \"\"\"\n",
" \n",
" @classmethod\n",
" def read(cls, fp):\n",
" raise NotImplementedError()\n",
"\n",
" def write(self, fp):\n",
" raise NotImplementedError()\n",
"\n",
" @classmethod\n",
" def frombytes(self, data, *args, **kwargs):\n",
" with io.BytesIO(data) as f:\n",
" return self.read(f, *args, **kwargs)\n",
"\n",
" def tobytes(self, *args, **kwargs):\n",
" with io.BytesIO() as f:\n",
" self.write(f, *args, **kwargs)\n",
" return f.getvalue()\n",
"\n",
" \n",
"class Primitive(Serializable):\n",
" \"\"\"\n",
" Fixed-length primitive types.\n",
" \"\"\"\n",
" _FMT = None\n",
" \n",
" @classmethod\n",
" def read(cls, fp, **kwargs):\n",
" return cls(*read_fmt(fp, cls._FMT))\n",
"\n",
" def write(self, fp, **kwargs):\n",
" return write_fmt(fp, self._FMT, self)\n",
"\n",
"\n",
"# bool cannot be subclassed.\n",
"class Bool(int, Primitive):\n",
" _FMT = struct.Struct('>?')\n",
"\n",
"\n",
"class Int8(int, Primitive):\n",
" _FMT = struct.Struct('>b')\n",
"\n",
"\n",
"class UInt8(int, Primitive):\n",
" _FMT = struct.Struct('>B')\n",
"\n",
"\n",
"class Int16(int, Primitive):\n",
" _FMT = struct.Struct('>h')\n",
"\n",
"\n",
"class UInt16(int, Primitive):\n",
" _FMT = struct.Struct('>H')\n",
"\n",
"\n",
"class Int32(int, Primitive):\n",
" _FMT = struct.Struct('>i')\n",
"\n",
"\n",
"class UInt32(int, Primitive):\n",
" _FMT = struct.Struct('>I')\n",
"\n",
"\n",
"class Int64(int, Primitive):\n",
" _FMT = struct.Struct('>q')\n",
"\n",
"\n",
"class UInt64(int, Primitive):\n",
" _FMT = struct.Struct('>Q')\n",
"\n",
"\n",
"class Float(float, Primitive):\n",
" _FMT = struct.Struct('>f')\n",
"\n",
"\n",
"class Double(float, Primitive):\n",
" _FMT = struct.Struct('>d')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Examples:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"for cls, value in [\n",
" (Bool, True),\n",
" (Int8, -1),\n",
" (UInt8, 1),\n",
" (Int16, -1),\n",
" (UInt16, 1),\n",
" (Int32, -1),\n",
" (UInt32, 1),\n",
" (Int64, -1),\n",
" (UInt64, 1),\n",
" (Float, 1.),\n",
" (Double, 1.),\n",
"]:\n",
" assert value == cls.frombytes(cls(value).tobytes())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Operator returns python primitives."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4.0"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Int32(3) + Double(1.)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`bool` is a special class and cannot be completely emulated."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"False"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Bool(True) is True"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Bool(True) == True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sequences\n",
"\n",
"Sequences can be varied in length."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from collections import OrderedDict\n",
"\n",
"# For Python 2 compatibility.\n",
"try:\n",
" unicode\n",
" unichr\n",
"except NameError:\n",
" unicode = str\n",
" unichr = chr\n",
"\n",
"\n",
"class Bytes(bytes, Serializable):\n",
" \"\"\"\n",
" Raw bytes serializes by 2 bytes length field followed by bytes.\n",
" \"\"\"\n",
" @classmethod\n",
" def read(cls, fp, padding=1):\n",
" offset = fp.tell()\n",
" length = read_fmt(fp, '>H')[0]\n",
" buffer = read_bytes(fp, length)\n",
" read_padding(fp, fp.tell() - offset, padding)\n",
" return cls(buffer)\n",
" \n",
" def write(self, fp, padding=1):\n",
" written = write_fmt(fp, '>H', len(self))\n",
" written += write_bytes(fp, self)\n",
" written += write_padding(fp, written, padding)\n",
" return written\n",
"\n",
"\n",
"class PascalString(str, Serializable):\n",
" \"\"\"\n",
" Pascal string is a string format for 1 byte length field followed by bytes.\n",
" \"\"\"\n",
" @classmethod\n",
" def read(cls, fp, encoding='macroman', padding=2):\n",
" offset = fp.tell()\n",
" length = read_fmt(fp, '>B')[0]\n",
" buffer = read_fmt(fp, '>%ds' % length)[0]\n",
" read_padding(fp, fp.tell() - offset, padding)\n",
" return cls(buffer.decode(encoding))\n",
"\n",
" def write(self, fp, encoding='macroman', padding=2):\n",
" written = write_fmt(fp, '>B', len(self))\n",
" written += write_fmt(fp, '>%ds' % len(self), self.encode(encoding))\n",
" written += write_padding(fp, written, padding)\n",
" return written\n",
"\n",
" \n",
"class Unicode(unicode, Serializable):\n",
" \"\"\"\n",
" Unicode string serializes itself by length followed by 2 bytes sequence for every words.\n",
" \"\"\"\n",
" def __new__(self, value, encoding='utf-8', *args, **kwargs):\n",
" # Always use unicode for python 2 str.\n",
" if bytes == str and isinstance(value, str):\n",
" value = value.decode(encoding=encoding)\n",
" return super(Unicode, self).__new__(self, value, *args, **kwargs)\n",
" \n",
" @classmethod\n",
" def read(cls, fp, padding=1):\n",
" offset = fp.tell()\n",
" length = read_fmt(fp, '>I')[0]\n",
" value = ''.join(unichr(x) for x in read_fmt(fp, '>%dH' % length))\n",
" read_padding(fp, fp.tell() - offset, padding)\n",
" return cls(value)\n",
"\n",
" def write(self, fp, padding=1):\n",
" written = write_fmt(fp, '>I', len(self))\n",
" written += write_fmt(fp, '>%dH' % len(self), *[ord(x) for x in self])\n",
" written += write_padding(fp, written, padding)\n",
" return written"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Examples:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"for cls, value in [\n",
" (Bytes, b'\\x01\\x03'),\n",
" (PascalString, 'abc'),\n",
" (Unicode, u'あいう'),\n",
"]:\n",
" assert value == cls.frombytes(cls(value).tobytes())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Containers\n",
"\n",
"Containers must know element types."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"import attr\n",
"\n",
"class List(list, Serializable):\n",
" \"\"\"\n",
" Base list example, intended for inherited use.\n",
" \"\"\"\n",
" _ELEMENT_TYPE = None # Subclass must define the element type.\n",
" \n",
" @classmethod\n",
" def read(cls, fp, *args, **kwargs):\n",
" self = cls()\n",
" while is_readable(fp, 4):\n",
" self.append(cls._ELEMENT_TYPE.read(fp, *args, **kwargs))\n",
" return self\n",
" \n",
" def write(self, fp, *args, **kwargs):\n",
" written = 0\n",
" for item in self:\n",
" written += item.write(fp, *args, **kwargs)\n",
" return written\n",
" \n",
"\n",
"class Dict(OrderedDict, Serializable):\n",
" \"\"\"\n",
" Base dict example, intended for inherited use.\n",
" \"\"\"\n",
" _KEY_TYPE = None # Subclass must define the element type.\n",
" _VALUE_TYPE = None # Subclass must define the element type.\n",
" \n",
" @classmethod\n",
" def read(cls, fp, *args, **kwargs):\n",
" self = cls()\n",
" while is_readable(fp, 4):\n",
" key = cls._KEY_TYPE.read(fp)\n",
" value = cls._VALUE_TYPE.read(fp, *args, **kwargs)\n",
" self[key] = value\n",
" return self\n",
" \n",
" def write(self, fp, *args, **kwargs):\n",
" written = 0\n",
" for key in self:\n",
" written += key.write(fp)\n",
" written += self[key].write(fp, *args, **kwargs)\n",
" return written\n",
" \n",
"\n",
"@attr.s(slots=True)\n",
"class MyClass(Serializable):\n",
" \"\"\"\n",
" Custom data class example.\n",
" \n",
" For Python 3.7+, attrs package can be replaced with dataclasses.\n",
" \"\"\"\n",
" a = attr.ib(type=Int32, converter=Int32)\n",
" b = attr.ib(type=PascalString, converter=PascalString)\n",
" \n",
" @classmethod\n",
" def read(cls, fp, *args, **kwargs):\n",
" return cls(*[field.type.read(fp, *args, **kwargs) for field in attr.fields(cls)])\n",
" \n",
" def write(self, fp, *args, **kwargs):\n",
" written = 0\n",
" for item in attr.astuple(self):\n",
" written += item.write(fp, *args, **kwargs)\n",
" return written"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Examples:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"class Int32List(List):\n",
" _ELEMENT_TYPE = Int32\n",
"\n",
"class UInt32UnicodeDict(Dict):\n",
" _KEY_TYPE = UInt32\n",
" _VALUE_TYPE = Unicode\n",
"\n",
"for inst in [\n",
" Int32List(Int32(x) for x in range(3)),\n",
" UInt32UnicodeDict((UInt32(x), Unicode(u'%d' % x)) for x in range(3)),\n",
" MyClass(23, 'foo'),\n",
"]:\n",
" assert inst == inst.__class__.frombytes(inst.tobytes())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment