Skip to content

Instantly share code, notes, and snippets.

@king1600
Created October 21, 2017 05:35
Show Gist options
  • Save king1600/401cf47f2654f0daec6112d5fb91b606 to your computer and use it in GitHub Desktop.
Save king1600/401cf47f2654f0daec6112d5fb91b606 to your computer and use it in GitHub Desktop.
JPack: The most of JSON and erlpack/etf
"""
JPack: The most of JSON and erlpack/etf
____.__________ _____ _________ ____ __.
| |\______ \/ _ \ \_ ___ \| |/ _|
| | | ___/ /_\ \/ \ \/| <
/\__| | | | / | \ \___| | \
\________| |____| \____|__ /\______ /____|__ \
\/ \/ \/
Copyright(c) 2017 king1600 (https://github.com/king1600)
Permission is hereby granted, free of charge, to any person obtaining a
copy of this softwareand associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
=====================================================================================================
Example:
from jpack import pack, unpack
byte_data = pack({'a': 1234, 'b': [3.14, None, True]}, False)
dict_and_bool = unpack(byte_data)
"""
import struct
import collections
""" Integer Constants """
INT_MAX = 0xFFFFFFFF
INT_MIN = -0x7FFFFFFF
SHORT_MAX = 0xFFFF
SHORT_MIN = -0x7FFF
FLOAT_MAX = 1e+37
FLOAT_MIN = 1e-37
""" JPack Types """
NULL = 0
OBJECT = 1
ARRAY = 2
STRING = 3
BYTE = 4
SHORT = 5
INT = 6
LONG = 7
FLOAT = 8
DOUBLE = 9
BOOL = 10
SIGNED = 0
UNSIGNED = 1
""" Buffer Struct """
class Buffer:
__slots__ = ('data', 'length', 'size')
def __init__(self, data=None, length=None, size=None):
self.data, self.length, self.size = data, length, size
""" Buffer Error """
class BufferException(Exception):
pass
def buf_grow_size(buf : Buffer, size : int):
""" Make sure buffer has enough data to write to """
# new buffer, create with a little over requested size
if buf.data is None:
buf.length = 0
buf.size = int((buf.length + size) * 15 / 10)
buf.data = bytearray(buf.size)
# data will overflow, increase buffer with room to spare
elif size + buf.length > buf.size and buf.data is not None:
buf.size = int((buf.length + size) * 15 / 10)
resized = bytearray(buf.size)
resized[:buf.length] = buf.data
buf.data = resized
def buf_assert_size(buf : Buffer, size : int):
""" Check if buffer has enough data to read """
remain_after = buf.size - (buf.length + size)
if remain_after < 0:
raise BufferException("Buffer underflow: {}".format(abs(remain_after)))
def buf_read(buf : Buffer, num_bytes : int):
""" Read num_bytes from the buffer """
buf_assert_size(buf, num_bytes)
data = buf.data[buf.length : buf.length + num_bytes]
buf.length += num_bytes
return data
def buf_write(buf : Buffer, data : collections.Iterable, num_bytes : int):
""" Write num_bytes from data into buffer """
buf_grow_size(buf, num_bytes)
buf.data[buf.length : buf.length + num_bytes] = data
buf.length += num_bytes
def buf_unpack(buf : Buffer, fmt : str):
""" Struct.unpack data from the buffer """
num_bytes = struct.calcsize(fmt)
buf_assert_size(buf, num_bytes)
value, = struct.unpack_from(fmt, buf.data, buf.length)
buf.length += num_bytes
return value
def buf_pack(buf : Buffer, fmt : str, *values : tuple):
""" Struct.pack data into the buffer """
size = struct.calcsize(fmt) * len(values)
buf_grow_size(buf, size)
struct.pack_into(fmt, buf.data, buf.length, *values)
buf.length += size
def _pack(buf : Buffer, var : object):
""" Internal method to append an object to a buffer """
# append null : TAG + 0 bytes
if var is None:
buf_write(buf, (NULL << 1,), 1)
# append bool : TAG + 0 bytes (bool stored in first bit of TAG)
elif isinstance(var, bool):
buf_write(buf, ((BOOL << 1) | (1 if var else 0),), 1)
# append string : TAG + (2-4 bytes in size) + (num_bytes in string)
elif isinstance(var, str):
data = var.encode()
size = len(data)
sign = SIGNED if size > SHORT_MAX else UNSIGNED
buf_write(buf, ((STRING << 1) | sign,), 1)
buf_pack(buf, '!I' if sign == SIGNED else '!H', size)
buf_write(buf, data, size)
# append float (TAG + 4 bytes) or double (TAG + 8 bytes)
elif isinstance(var, float):
is_32bit = False # TODO: find a way to check this
ftype = FLOAT if is_32bit else DOUBLE
fformat = '!f' if is_32bit else '!d'
size = 1 + struct.calcsize(fformat)
buf_write(buf, (ftype << 1,), 1)
buf_pack(buf, fformat, var)
# append object : TAG + (2-4 bytes in pairs) + _pack(key & value)
elif isinstance(var, dict):
pairs = var.keys()
size = len(pairs)
sign = SIGNED if size > SHORT_MAX else UNSIGNED
buf_write(buf, ((OBJECT << 1) | sign,), 1)
buf_pack(buf, '!I' if sign == SIGNED else '!H', size)
for key in pairs:
_pack(buf, key)
_pack(buf, var[key])
# append list : TAG + (2-4 bytes in length) + _pack(each_value)
elif isinstance(var, collections.Iterable):
array = list(var)
size = len(array)
sign = SIGNED if size > SHORT_MAX else UNSIGNED
buf_write(buf, ((ARRAY << 1) | sign,), 1)
buf_pack(buf, '!I' if sign == SIGNED else '!H', size)
for item in array:
_pack(buf, item)
# append number : TAG + (num_bytes in integer value)
elif isinstance(var, int):
sign = SIGNED if var < 0 else UNSIGNED
if -0xFF <= var <= 0xFF:
itype, iformat = BYTE, ('!b' if sign == SIGNED else '!B')
elif SHORT_MIN <= var <= SHORT_MAX:
itype, iformat = SHORT, ('!h' if sign == SIGNED else '!H')
elif INT_MIN <= var <= INT_MAX:
itype, iformat = INT, ('!i' if sign == SIGNED else '!I')
else:
itype, iformat = LONG, ('!q' if sign == SIGNED else '!Q')
size = 1 + struct.calcsize(iformat)
buf_write(buf, ((itype << 1) | sign,), 1)
buf_pack(buf, iformat, var)
def _unpack(buf : Buffer):
""" Read and Unpack a value from the buffer """
btag = buf_read(buf, 1)[0]
bsign = btag & 1
btype = btag >> 1
# unpack null
if btype == NULL:
return None
# unpack a boolean
elif btype == BOOL:
return True if bsign == 1 else False
# unpack a float or double
elif btype == FLOAT or btype == DOUBLE:
return buf_unpack(buf, '!f' if btype == FLOAT else '!d')
# unpack a string
elif btype == STRING:
length = buf_unpack(buf, '!I' if bsign == SIGNED else '!H')
return buf_read(buf, length).decode()[:length]
# unpack an array / list
elif btype == ARRAY:
length = buf_unpack(buf, '!I' if bsign == SIGNED else '!H')
return [_unpack(buf) for i in range(length)]
# unpack an object / structure
elif btype == OBJECT:
obj = {}
pairs = buf_unpack(buf, '!I' if bsign == SIGNED else '!H')
for i in range(pairs):
key = _unpack(buf)
obj[key] = _unpack(buf)
return obj
# unpack a number
elif btype in (BYTE, SHORT, INT, LONG):
if btype == BYTE:
fmt = '!b' if bsign == SIGNED else '!B'
elif btype == SHORT:
fmt = '!h' if bsign == SIGNED else '!H'
elif btype == INT:
fmt = '!i' if bsign == SIGNED else '!I'
else:
fmt = '!q' if bsign == SIGNED else '!Q'
return buf_unpack(buf, fmt)
def pack(*values : tuple):
""" API Method for packing variable """
buf = Buffer(length=0, size=0)
for value in values:
_pack(buf, value)
return bytes(buf.data[:buf.length])
def unpack(data : collections.Iterable):
""" API Method for unpacking variables from buffer """
# get as bytes or bytearray format
if isinstance(data, str):
data = data.encode()
elif isinstance(data, list):
data = bytes(data)
if not isinstance(data, bytes) and not isinstance(data, bytearray):
raise BufferException("Unpacking only supports bytes and bytearray")
# read the results
buf = Buffer(data=data, length=0, size=len(data))
results = []
while True:
results.append(_unpack(buf))
if buf.length == buf.size:
break
# return the unpack variable(s)
return results[0] if len(results) == 1 else results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment