Created
October 21, 2017 05:35
-
-
Save king1600/401cf47f2654f0daec6112d5fb91b606 to your computer and use it in GitHub Desktop.
JPack: The most of JSON and erlpack/etf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
JPack: The most of JSON and erlpack/etf | |
____.__________ _____ _________ ____ __. | |
| |\______ \/ _ \ \_ ___ \| |/ _| | |
| | | ___/ /_\ \/ \ \/| < | |
/\__| | | | / | \ \___| | \ | |
\________| |____| \____|__ /\______ /____|__ \ | |
\/ \/ \/ | |
Copyright(c) 2017 king1600 (https://github.com/king1600) | |
Permission is hereby granted, free of charge, to any person obtaining a | |
copy of this softwareand associated documentation files (the "Software"), | |
to deal in the Software without restriction, including without limitation | |
the rights to use, copy, modify, merge, publish, distribute, sublicense, | |
and/or sell copies of the Software, and to permit persons to whom the Software | |
is furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | |
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR | |
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE | |
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
===================================================================================================== | |
Example: | |
from jpack import pack, unpack | |
byte_data = pack({'a': 1234, 'b': [3.14, None, True]}, False) | |
dict_and_bool = unpack(byte_data) | |
""" | |
import struct | |
import collections | |
""" Integer Constants """ | |
INT_MAX = 0xFFFFFFFF | |
INT_MIN = -0x7FFFFFFF | |
SHORT_MAX = 0xFFFF | |
SHORT_MIN = -0x7FFF | |
FLOAT_MAX = 1e+37 | |
FLOAT_MIN = 1e-37 | |
""" JPack Types """ | |
NULL = 0 | |
OBJECT = 1 | |
ARRAY = 2 | |
STRING = 3 | |
BYTE = 4 | |
SHORT = 5 | |
INT = 6 | |
LONG = 7 | |
FLOAT = 8 | |
DOUBLE = 9 | |
BOOL = 10 | |
SIGNED = 0 | |
UNSIGNED = 1 | |
""" Buffer Struct """ | |
class Buffer: | |
__slots__ = ('data', 'length', 'size') | |
def __init__(self, data=None, length=None, size=None): | |
self.data, self.length, self.size = data, length, size | |
""" Buffer Error """ | |
class BufferException(Exception): | |
pass | |
def buf_grow_size(buf : Buffer, size : int): | |
""" Make sure buffer has enough data to write to """ | |
# new buffer, create with a little over requested size | |
if buf.data is None: | |
buf.length = 0 | |
buf.size = int((buf.length + size) * 15 / 10) | |
buf.data = bytearray(buf.size) | |
# data will overflow, increase buffer with room to spare | |
elif size + buf.length > buf.size and buf.data is not None: | |
buf.size = int((buf.length + size) * 15 / 10) | |
resized = bytearray(buf.size) | |
resized[:buf.length] = buf.data | |
buf.data = resized | |
def buf_assert_size(buf : Buffer, size : int): | |
""" Check if buffer has enough data to read """ | |
remain_after = buf.size - (buf.length + size) | |
if remain_after < 0: | |
raise BufferException("Buffer underflow: {}".format(abs(remain_after))) | |
def buf_read(buf : Buffer, num_bytes : int): | |
""" Read num_bytes from the buffer """ | |
buf_assert_size(buf, num_bytes) | |
data = buf.data[buf.length : buf.length + num_bytes] | |
buf.length += num_bytes | |
return data | |
def buf_write(buf : Buffer, data : collections.Iterable, num_bytes : int): | |
""" Write num_bytes from data into buffer """ | |
buf_grow_size(buf, num_bytes) | |
buf.data[buf.length : buf.length + num_bytes] = data | |
buf.length += num_bytes | |
def buf_unpack(buf : Buffer, fmt : str): | |
""" Struct.unpack data from the buffer """ | |
num_bytes = struct.calcsize(fmt) | |
buf_assert_size(buf, num_bytes) | |
value, = struct.unpack_from(fmt, buf.data, buf.length) | |
buf.length += num_bytes | |
return value | |
def buf_pack(buf : Buffer, fmt : str, *values : tuple): | |
""" Struct.pack data into the buffer """ | |
size = struct.calcsize(fmt) * len(values) | |
buf_grow_size(buf, size) | |
struct.pack_into(fmt, buf.data, buf.length, *values) | |
buf.length += size | |
def _pack(buf : Buffer, var : object): | |
""" Internal method to append an object to a buffer """ | |
# append null : TAG + 0 bytes | |
if var is None: | |
buf_write(buf, (NULL << 1,), 1) | |
# append bool : TAG + 0 bytes (bool stored in first bit of TAG) | |
elif isinstance(var, bool): | |
buf_write(buf, ((BOOL << 1) | (1 if var else 0),), 1) | |
# append string : TAG + (2-4 bytes in size) + (num_bytes in string) | |
elif isinstance(var, str): | |
data = var.encode() | |
size = len(data) | |
sign = SIGNED if size > SHORT_MAX else UNSIGNED | |
buf_write(buf, ((STRING << 1) | sign,), 1) | |
buf_pack(buf, '!I' if sign == SIGNED else '!H', size) | |
buf_write(buf, data, size) | |
# append float (TAG + 4 bytes) or double (TAG + 8 bytes) | |
elif isinstance(var, float): | |
is_32bit = False # TODO: find a way to check this | |
ftype = FLOAT if is_32bit else DOUBLE | |
fformat = '!f' if is_32bit else '!d' | |
size = 1 + struct.calcsize(fformat) | |
buf_write(buf, (ftype << 1,), 1) | |
buf_pack(buf, fformat, var) | |
# append object : TAG + (2-4 bytes in pairs) + _pack(key & value) | |
elif isinstance(var, dict): | |
pairs = var.keys() | |
size = len(pairs) | |
sign = SIGNED if size > SHORT_MAX else UNSIGNED | |
buf_write(buf, ((OBJECT << 1) | sign,), 1) | |
buf_pack(buf, '!I' if sign == SIGNED else '!H', size) | |
for key in pairs: | |
_pack(buf, key) | |
_pack(buf, var[key]) | |
# append list : TAG + (2-4 bytes in length) + _pack(each_value) | |
elif isinstance(var, collections.Iterable): | |
array = list(var) | |
size = len(array) | |
sign = SIGNED if size > SHORT_MAX else UNSIGNED | |
buf_write(buf, ((ARRAY << 1) | sign,), 1) | |
buf_pack(buf, '!I' if sign == SIGNED else '!H', size) | |
for item in array: | |
_pack(buf, item) | |
# append number : TAG + (num_bytes in integer value) | |
elif isinstance(var, int): | |
sign = SIGNED if var < 0 else UNSIGNED | |
if -0xFF <= var <= 0xFF: | |
itype, iformat = BYTE, ('!b' if sign == SIGNED else '!B') | |
elif SHORT_MIN <= var <= SHORT_MAX: | |
itype, iformat = SHORT, ('!h' if sign == SIGNED else '!H') | |
elif INT_MIN <= var <= INT_MAX: | |
itype, iformat = INT, ('!i' if sign == SIGNED else '!I') | |
else: | |
itype, iformat = LONG, ('!q' if sign == SIGNED else '!Q') | |
size = 1 + struct.calcsize(iformat) | |
buf_write(buf, ((itype << 1) | sign,), 1) | |
buf_pack(buf, iformat, var) | |
def _unpack(buf : Buffer): | |
""" Read and Unpack a value from the buffer """ | |
btag = buf_read(buf, 1)[0] | |
bsign = btag & 1 | |
btype = btag >> 1 | |
# unpack null | |
if btype == NULL: | |
return None | |
# unpack a boolean | |
elif btype == BOOL: | |
return True if bsign == 1 else False | |
# unpack a float or double | |
elif btype == FLOAT or btype == DOUBLE: | |
return buf_unpack(buf, '!f' if btype == FLOAT else '!d') | |
# unpack a string | |
elif btype == STRING: | |
length = buf_unpack(buf, '!I' if bsign == SIGNED else '!H') | |
return buf_read(buf, length).decode()[:length] | |
# unpack an array / list | |
elif btype == ARRAY: | |
length = buf_unpack(buf, '!I' if bsign == SIGNED else '!H') | |
return [_unpack(buf) for i in range(length)] | |
# unpack an object / structure | |
elif btype == OBJECT: | |
obj = {} | |
pairs = buf_unpack(buf, '!I' if bsign == SIGNED else '!H') | |
for i in range(pairs): | |
key = _unpack(buf) | |
obj[key] = _unpack(buf) | |
return obj | |
# unpack a number | |
elif btype in (BYTE, SHORT, INT, LONG): | |
if btype == BYTE: | |
fmt = '!b' if bsign == SIGNED else '!B' | |
elif btype == SHORT: | |
fmt = '!h' if bsign == SIGNED else '!H' | |
elif btype == INT: | |
fmt = '!i' if bsign == SIGNED else '!I' | |
else: | |
fmt = '!q' if bsign == SIGNED else '!Q' | |
return buf_unpack(buf, fmt) | |
def pack(*values : tuple): | |
""" API Method for packing variable """ | |
buf = Buffer(length=0, size=0) | |
for value in values: | |
_pack(buf, value) | |
return bytes(buf.data[:buf.length]) | |
def unpack(data : collections.Iterable): | |
""" API Method for unpacking variables from buffer """ | |
# get as bytes or bytearray format | |
if isinstance(data, str): | |
data = data.encode() | |
elif isinstance(data, list): | |
data = bytes(data) | |
if not isinstance(data, bytes) and not isinstance(data, bytearray): | |
raise BufferException("Unpacking only supports bytes and bytearray") | |
# read the results | |
buf = Buffer(data=data, length=0, size=len(data)) | |
results = [] | |
while True: | |
results.append(_unpack(buf)) | |
if buf.length == buf.size: | |
break | |
# return the unpack variable(s) | |
return results[0] if len(results) == 1 else results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment