Created
June 20, 2019 06:56
-
-
Save kyamagu/18bfed1d5dbd4d419189adfb002a82f3 to your computer and use it in GitHub Desktop.
Serializable primitive and custom python objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Serializable objects\n", | |
"\n", | |
"This notebook demonstrates serializable objects that can `read` and `write` in a binary format.\n", | |
"The model is compatible with Python 2 and 3.\n", | |
"\n", | |
"For Python 3.7+, dataclasses might fit in the needs here." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Utils\n", | |
"\n", | |
"Helper functions for binary IO." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import io\n", | |
"import struct\n", | |
"\n", | |
"def read_bytes(fp, size):\n", | |
" data = fp.read(size)\n", | |
" assert size == len(data), 'read=%d, expected=%d' % (len(data), size)\n", | |
" return data\n", | |
"\n", | |
"\n", | |
"def write_bytes(fp, data):\n", | |
" offset = fp.tell()\n", | |
" fp.write(data)\n", | |
" size = fp.tell() - offset\n", | |
" assert size == len(data), 'written=%d, expected=%d' % (size, len(data))\n", | |
" return size\n", | |
" \n", | |
" \n", | |
"def read_fmt(fp, fmt):\n", | |
" if isinstance(fmt, struct.Struct):\n", | |
" return fmt.unpack(read_bytes(fp, fmt.size))\n", | |
" else:\n", | |
" buffer = read_bytes(fp, struct.calcsize(fmt))\n", | |
" return struct.unpack(fmt, buffer)\n", | |
"\n", | |
"\n", | |
"def write_fmt(fp, fmt, *args):\n", | |
" if isinstance(fmt, struct.Struct):\n", | |
" return write_bytes(fp, fmt.pack(*args))\n", | |
" else:\n", | |
" return write_bytes(fp, struct.pack(fmt, *args))\n", | |
"\n", | |
"\n", | |
"def read_padding(fp, length, divisor=1):\n", | |
" remainder = length % divisor\n", | |
" if remainder:\n", | |
" return read_bytes(fp, divisor - remainder)\n", | |
" return b''\n", | |
"\n", | |
"\n", | |
"def write_padding(fp, length, divisor=2):\n", | |
" remainder = length % divisor\n", | |
" if remainder:\n", | |
" return write_bytes(fp, b'\\x00' * (divisor - remainder))\n", | |
" return 0\n", | |
"\n", | |
"\n", | |
"def is_readable(fp, size=1):\n", | |
" read_size = len(fp.read(size))\n", | |
" fp.seek(-read_size, 1)\n", | |
" return read_size == size" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Serializable primitives" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"class Serializable(object):\n", | |
" \"\"\"\n", | |
" Base serialization interface.\n", | |
" \"\"\"\n", | |
" \n", | |
" @classmethod\n", | |
" def read(cls, fp):\n", | |
" raise NotImplementedError()\n", | |
"\n", | |
" def write(self, fp):\n", | |
" raise NotImplementedError()\n", | |
"\n", | |
" @classmethod\n", | |
" def frombytes(self, data, *args, **kwargs):\n", | |
" with io.BytesIO(data) as f:\n", | |
" return self.read(f, *args, **kwargs)\n", | |
"\n", | |
" def tobytes(self, *args, **kwargs):\n", | |
" with io.BytesIO() as f:\n", | |
" self.write(f, *args, **kwargs)\n", | |
" return f.getvalue()\n", | |
"\n", | |
" \n", | |
"class Primitive(Serializable):\n", | |
" \"\"\"\n", | |
" Fixed-length primitive types.\n", | |
" \"\"\"\n", | |
" _FMT = None\n", | |
" \n", | |
" @classmethod\n", | |
" def read(cls, fp, **kwargs):\n", | |
" return cls(*read_fmt(fp, cls._FMT))\n", | |
"\n", | |
" def write(self, fp, **kwargs):\n", | |
" return write_fmt(fp, self._FMT, self)\n", | |
"\n", | |
"\n", | |
"# bool cannot be subclassed.\n", | |
"class Bool(int, Primitive):\n", | |
" _FMT = struct.Struct('>?')\n", | |
"\n", | |
"\n", | |
"class Int8(int, Primitive):\n", | |
" _FMT = struct.Struct('>b')\n", | |
"\n", | |
"\n", | |
"class UInt8(int, Primitive):\n", | |
" _FMT = struct.Struct('>B')\n", | |
"\n", | |
"\n", | |
"class Int16(int, Primitive):\n", | |
" _FMT = struct.Struct('>h')\n", | |
"\n", | |
"\n", | |
"class UInt16(int, Primitive):\n", | |
" _FMT = struct.Struct('>H')\n", | |
"\n", | |
"\n", | |
"class Int32(int, Primitive):\n", | |
" _FMT = struct.Struct('>i')\n", | |
"\n", | |
"\n", | |
"class UInt32(int, Primitive):\n", | |
" _FMT = struct.Struct('>I')\n", | |
"\n", | |
"\n", | |
"class Int64(int, Primitive):\n", | |
" _FMT = struct.Struct('>q')\n", | |
"\n", | |
"\n", | |
"class UInt64(int, Primitive):\n", | |
" _FMT = struct.Struct('>Q')\n", | |
"\n", | |
"\n", | |
"class Float(float, Primitive):\n", | |
" _FMT = struct.Struct('>f')\n", | |
"\n", | |
"\n", | |
"class Double(float, Primitive):\n", | |
" _FMT = struct.Struct('>d')" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Examples:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"for cls, value in [\n", | |
" (Bool, True),\n", | |
" (Int8, -1),\n", | |
" (UInt8, 1),\n", | |
" (Int16, -1),\n", | |
" (UInt16, 1),\n", | |
" (Int32, -1),\n", | |
" (UInt32, 1),\n", | |
" (Int64, -1),\n", | |
" (UInt64, 1),\n", | |
" (Float, 1.),\n", | |
" (Double, 1.),\n", | |
"]:\n", | |
" assert value == cls.frombytes(cls(value).tobytes())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Operator returns python primitives." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"4.0" | |
] | |
}, | |
"execution_count": 4, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"Int32(3) + Double(1.)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"`bool` is a special class and cannot be completely emulated." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"False" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"Bool(True) is True" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"True" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"Bool(True) == True" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Sequences\n", | |
"\n", | |
"Sequences can be varied in length." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from collections import OrderedDict\n", | |
"\n", | |
"# For Python 2 compatibility.\n", | |
"try:\n", | |
" unicode\n", | |
" unichr\n", | |
"except NameError:\n", | |
" unicode = str\n", | |
" unichr = chr\n", | |
"\n", | |
"\n", | |
"class Bytes(bytes, Serializable):\n", | |
" \"\"\"\n", | |
" Raw bytes serializes by 2 bytes length field followed by bytes.\n", | |
" \"\"\"\n", | |
" @classmethod\n", | |
" def read(cls, fp, padding=1):\n", | |
" offset = fp.tell()\n", | |
" length = read_fmt(fp, '>H')[0]\n", | |
" buffer = read_bytes(fp, length)\n", | |
" read_padding(fp, fp.tell() - offset, padding)\n", | |
" return cls(buffer)\n", | |
" \n", | |
" def write(self, fp, padding=1):\n", | |
" written = write_fmt(fp, '>H', len(self))\n", | |
" written += write_bytes(fp, self)\n", | |
" written += write_padding(fp, written, padding)\n", | |
" return written\n", | |
"\n", | |
"\n", | |
"class PascalString(str, Serializable):\n", | |
" \"\"\"\n", | |
" Pascal string is a string format for 1 byte length field followed by bytes.\n", | |
" \"\"\"\n", | |
" @classmethod\n", | |
" def read(cls, fp, encoding='macroman', padding=2):\n", | |
" offset = fp.tell()\n", | |
" length = read_fmt(fp, '>B')[0]\n", | |
" buffer = read_fmt(fp, '>%ds' % length)[0]\n", | |
" read_padding(fp, fp.tell() - offset, padding)\n", | |
" return cls(buffer.decode(encoding))\n", | |
"\n", | |
" def write(self, fp, encoding='macroman', padding=2):\n", | |
" written = write_fmt(fp, '>B', len(self))\n", | |
" written += write_fmt(fp, '>%ds' % len(self), self.encode(encoding))\n", | |
" written += write_padding(fp, written, padding)\n", | |
" return written\n", | |
"\n", | |
" \n", | |
"class Unicode(unicode, Serializable):\n", | |
" \"\"\"\n", | |
" Unicode string serializes itself by length followed by 2 bytes sequence for every words.\n", | |
" \"\"\"\n", | |
" def __new__(self, value, encoding='utf-8', *args, **kwargs):\n", | |
" # Always use unicode for python 2 str.\n", | |
" if bytes == str and isinstance(value, str):\n", | |
" value = value.decode(encoding=encoding)\n", | |
" return super(Unicode, self).__new__(self, value, *args, **kwargs)\n", | |
" \n", | |
" @classmethod\n", | |
" def read(cls, fp, padding=1):\n", | |
" offset = fp.tell()\n", | |
" length = read_fmt(fp, '>I')[0]\n", | |
" value = ''.join(unichr(x) for x in read_fmt(fp, '>%dH' % length))\n", | |
" read_padding(fp, fp.tell() - offset, padding)\n", | |
" return cls(value)\n", | |
"\n", | |
" def write(self, fp, padding=1):\n", | |
" written = write_fmt(fp, '>I', len(self))\n", | |
" written += write_fmt(fp, '>%dH' % len(self), *[ord(x) for x in self])\n", | |
" written += write_padding(fp, written, padding)\n", | |
" return written" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Examples:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"for cls, value in [\n", | |
" (Bytes, b'\\x01\\x03'),\n", | |
" (PascalString, 'abc'),\n", | |
" (Unicode, u'あいう'),\n", | |
"]:\n", | |
" assert value == cls.frombytes(cls(value).tobytes())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Containers\n", | |
"\n", | |
"Containers must know element types." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import attr\n", | |
"\n", | |
"class List(list, Serializable):\n", | |
" \"\"\"\n", | |
" Base list example, intended for inherited use.\n", | |
" \"\"\"\n", | |
" _ELEMENT_TYPE = None # Subclass must define the element type.\n", | |
" \n", | |
" @classmethod\n", | |
" def read(cls, fp, *args, **kwargs):\n", | |
" self = cls()\n", | |
" while is_readable(fp, 4):\n", | |
" self.append(cls._ELEMENT_TYPE.read(fp, *args, **kwargs))\n", | |
" return self\n", | |
" \n", | |
" def write(self, fp, *args, **kwargs):\n", | |
" written = 0\n", | |
" for item in self:\n", | |
" written += item.write(fp, *args, **kwargs)\n", | |
" return written\n", | |
" \n", | |
"\n", | |
"class Dict(OrderedDict, Serializable):\n", | |
" \"\"\"\n", | |
" Base dict example, intended for inherited use.\n", | |
" \"\"\"\n", | |
" _KEY_TYPE = None # Subclass must define the element type.\n", | |
" _VALUE_TYPE = None # Subclass must define the element type.\n", | |
" \n", | |
" @classmethod\n", | |
" def read(cls, fp, *args, **kwargs):\n", | |
" self = cls()\n", | |
" while is_readable(fp, 4):\n", | |
" key = cls._KEY_TYPE.read(fp)\n", | |
" value = cls._VALUE_TYPE.read(fp, *args, **kwargs)\n", | |
" self[key] = value\n", | |
" return self\n", | |
" \n", | |
" def write(self, fp, *args, **kwargs):\n", | |
" written = 0\n", | |
" for key in self:\n", | |
" written += key.write(fp)\n", | |
" written += self[key].write(fp, *args, **kwargs)\n", | |
" return written\n", | |
" \n", | |
"\n", | |
"@attr.s(slots=True)\n", | |
"class MyClass(Serializable):\n", | |
" \"\"\"\n", | |
" Custom data class example.\n", | |
" \n", | |
" For Python 3.7+, attrs package can be replaced with dataclasses.\n", | |
" \"\"\"\n", | |
" a = attr.ib(type=Int32, converter=Int32)\n", | |
" b = attr.ib(type=PascalString, converter=PascalString)\n", | |
" \n", | |
" @classmethod\n", | |
" def read(cls, fp, *args, **kwargs):\n", | |
" return cls(*[field.type.read(fp, *args, **kwargs) for field in attr.fields(cls)])\n", | |
" \n", | |
" def write(self, fp, *args, **kwargs):\n", | |
" written = 0\n", | |
" for item in attr.astuple(self):\n", | |
" written += item.write(fp, *args, **kwargs)\n", | |
" return written" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Examples:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"class Int32List(List):\n", | |
" _ELEMENT_TYPE = Int32\n", | |
"\n", | |
"class UInt32UnicodeDict(Dict):\n", | |
" _KEY_TYPE = UInt32\n", | |
" _VALUE_TYPE = Unicode\n", | |
"\n", | |
"for inst in [\n", | |
" Int32List(Int32(x) for x in range(3)),\n", | |
" UInt32UnicodeDict((UInt32(x), Unicode(u'%d' % x)) for x in range(3)),\n", | |
" MyClass(23, 'foo'),\n", | |
"]:\n", | |
" assert inst == inst.__class__.frombytes(inst.tobytes())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment