Skip to content

Instantly share code, notes, and snippets.

@tav
Created October 25, 2009 18:49
Show Gist options
  • Select an option

  • Save tav/218186 to your computer and use it in GitHub Desktop.

Select an option

Save tav/218186 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
============================
Plexnet Serialisation Format
============================
The PSF (Plexnet Serialisation Format) aims to be a safe, efficient and portable
method of exchanging structured data across the Plexnet. It also doubles up as
the persistent format for POD (Plexnet Object Datastore).
Unlike data formats like XML, JSON, S-Expressions or YAML, PSF is a binary
serialisation format. It is geared towards efficient machine readability rather
than human readability.
It is language neutral
supports a rich set of data types
It is a
It must self-describe the serialized types, i.e. not require external schema or interface definitions.
It must be language-independent, including supporting scripting languages.
It must be readable or writable in a single pass.
It must be as compact as possible.
It must be simple so it can be effectively tested and implemented.
It must be as fast as possible.
It must support Unicode strings.
It must support 8-bit binary data without escaping or using attachments.
It must support encryption, compression, signature, and transaction context envelopes.
Python developers who are wondering why a standard module like ``pickle`` isn't
being used, only need to
For starters, we don't have to worry about
malicious code being executed -- which can happen with the standard ``marshal``
and ``pickle`` modules.
It also means that we can focus on being relatively efficient in our usage of
memory/space as well as have a *relatively* language neutral persistence format.
Anyways, a simple serialisation looks like:
>>> serialise_plex_type('a')
'\\xa0a'
The first byte indicates the type -- we are limited to a maximum of 128/256
builtin types ;p
As suggested, the serialisation process is relatively efficient. For example,
using the standard ``pickle`` module, the above would have taken up between 6
and 9 bytes depending on whether you used protocol 0, 1 or 2. But, here it's
just 2 bytes:
>>> len(serialise_plex_type('a'))
2
>>> deserialise_plex_type('\xa0a')
'a'
We define a simple function to help with the testing:
>>> def serialise_and_deserialise(value):
... '''Return the value after it has been serialised and deserialised.'''
...
... return deserialise_plex_type(serialise_plex_type(value))
>>> serialise_and_deserialise('hello')
'hello'
>>> serialise_and_deserialise(True)
True
>>> serialise_and_deserialise(False)
False
>>> serialise_and_deserialise(Maybe)
Maybe(50)
>>> serialise_and_deserialise(2342)
2342
>>> serialise_and_deserialise(decimal('1.5'))
Decimal("1.5")
>>> serialise_and_deserialise(1.5)
1.5
>>> complex_life = 3 + 4.2j
>>> serialise_and_deserialise(complex_life) == complex_life
True
>>> serialise_and_deserialise(None)
Null
>>> a_uuid = UUID()
>>> serialise_and_deserialise(a_uuid) == a_uuid
True
>>> serialise_and_deserialise(StorageItem('md5', 'abcd'))
<StorageItem: md5:abcd>
>>> serialise_and_deserialise((1, '2', Decimal('3.0'))
(1, '2', Decimal("3.0"))
>>> a_set = set(['2', 1, 3.0])
>>> serialise_and_deserialise(a_set) == a_set
True
>>> singers = {'Ms. Dynamite': True, 'Britney': False}
>>> new_singers = serialise_and_deserialise(singers)
>>> type(new_singers)
<type 'dict'>
>>> singers == new_singers
True
>>> text = u'açcént'
>>> serialise_plex_type(text)
'\\x02\\x00\\x0ca\\xc3\\x83\\xc2\\xa7c\\xc3\\x83\\xc2\\xa9nt'
^ ^ ^ ^ .^
| | | `---- the actual string itself ---
| | `--
type encoding `- string length
>>> store.reset(); out_text = deserialise_plex_type(store)
>>> out_text
u'a\\xc3\\xa7c\\xc3\\xa9nt'
>>> type(out_text).__name__
'Text'
>>> serialise_and_deserialise(date(2005, 06, 02))
datetime.date(2005, 6, 2)
>>> serialise_and_deserialise(datetime(2005, 06, 01, 3, 0, 59, 1803))
datetime.datetime(2005, 6, 1, 3, 0, 59, 1803)
>>> sec_ago = timedelta(seconds=-1)
>>> sec_ago
datetime.timedelta(-1, 86399)
>>> serialise_plex_type(sec_ago, store)
>>> store.getvalue()
'\\x13\\xc2\\x05\\x01\\x04\\xff\\xa2\\x05\\x80'
>>> store.reset(); deserialise_plex_type(store) == sec_ago
True
We have a simple integer storage mechanism. Whilst inefficient for long integers
-- it works out to be relatively efficient overall.
>>> pack_integer(2342)
'\\xa6\\x12'
>>> unpack_integer('\xa6\x12')
2342
>>> store = StringIO()
>>> pack_integer(7, store)
>>> pack_integer(2342, store)
>>> pack_integer(18, store)
>>> store.reset()
>>> unpack_integer(store)
7
>>> unpack_integer(store)
2342
>>> unpack_integer(store)
18
>>> import sys
>>> packed_maxint_square = pack_integer(sys.maxint ** 2)
>>> unpack_integer(packed_maxint_square) == sys.maxint ** 2
True
# "fixed-point" types -- from draft-frystyk-httpng-arch-00.txt
# A fixed-point type is defined with three parameters: a denominator, a
# maximum numerator value, and a minimum numerator value. These define a
# series of rational values which make up the allowable values of that
# fixed-point type. The numerator and denominator are integer values; the
# denominator is either a positive integer value greater than zero, or the
# reciprocal of a positive integer value greater than zero. Each value of a
# fixed-point type abstractly exists as a rational number with a numerator
# in the range specified for numerators, and a denominator of the specified
# denominator value. For example, one could define a fixed-point type which
# would cover the 16-bit unsigned integer space with a denominator of one
# (all integer types have denominators of one), a maximum numerator of 65535
# and a minimum numerator of zero. One could define a fixed-point type
# `dollars' for business purposes with a denominator of 100 (two decimal
# places for `cents'), a maximum numerator of 100000000 (1 million dollars)
# and a minimum numerator of -100000000 (1 million dollars). There are no
# limits on the sizes of denominators, maximum numerators, or minimum
# numerators.
We now do some fuzz testing with ..
"""
from .builtins import (
date, datetime, decimal, fraction, Maybe, text, time, timedelta, Blank, Null
)
from ..util.io import StringIO
from ..util.optimise import optimise
# from .index.base import Index
# from .index.builtin import Plexname, Unit, StorageItem
# from itertools import repeat
# from pickle import encode_long
# pimp('builtin/dict', 'AttrDict', 'CachingDict')
# pimp('builtin/uuid', 'UUID', 'uuid2string')
# pimp('datetime/time', 'TimeStamp', 'timestamp')
# pimp('functional', 'invert_dict')
def invert_dict(dict):
d = {}
for key, value in dict.iteritems():
d[value] = key
return d
class CachingDict(dict):
def __init__(self, size=0):
return dict.__init__(self)
class Unit(str):
pass
class UUID(str):
pass
class Text(unicode):
pass
StorageItem = object()
mapping = object()
TimeStamp = object()
Index = object()
Plexname = object()
# ------------------------------------------------------------------------------
# some konstants
# ------------------------------------------------------------------------------
# - booleans can be split into 3 types
__all__ = [
'pack_integer', 'unpack_integer', 'get_plex_type', 'serialise_plex_type',
'deserialise_plex_type',
'GroupedDict', 'between', 'for_type', 'for_objects_like', 'sorted_for',
'filtered_by',
]
__metaclass__ = type
# we are limiting the number of builtin types to 2 ** 8, i.e. 256. if for some
# reason this turns out to not be enough -- kall the a-team!
# standard builtin type code assignments (reserved ranges)
# first last count purpose
# 0 127 128 reserved for standard plex builtins
# 128 191 64 reserved for espian extensions
# 192 239 48 reserved for 3rd parties
# 240 255 16 reserved for private use (will never be assigned)
# 256 Inf Inf reserved for future assignment
# these codes are assigned by esp
type_id2string_map = {
'\x00':'unit',
# unitid
# URI
#
'\x01':'bytestream',
'\x02':'text',
# '\x03':'',
'\x04':'integer',
'\x05':'integer -- negative',
'\x06':'decimal',
'\x07':'float',
'\x08':'complex',
# fraction
#'\x09':'exception',
'\x0a':'null',
'\x0b':'sequence', # tuple
'\x0c':'set',
#'\x0d':'bag',
'\x0e':'list',
'\x0f':'mapping',
'\x10':'datetime',
'\x11':'date',
#'\x12':'time',
'\x13':'timedelta',
#'\x14':'timezone',
'\x15':'timestamp',
#'\x16':'base16',
#'\x17':'base64',
#'\x18':'struct',
'\x19':'uuid',
#'\x1a':'node',
#'\x1b':'entity',
#'\x1c':'event',
#'\x1d':'service',
'\x1e':'capbility',
'\x1f':'capability proxy',
'\x20':'index',
#'\x21':'', # item
'\x22':'storage',
'\x23':'plexname',
'\x24':'context', # domain / topic
#'\x25':'translation context',
'\x26':'rating',
'\x27':'trust',
#'\x28':'reputation',
#'\x29':'smart contract'
#'\x2a':'currency',
# signature
# key
'\x30':'boolean -- false',
'\x31':'boolean -- true',
'\x3f':'boolean -- maybe',
# enumerated / choice
# shell pattern
# regex
#'sensor'
#'transaction'
#'function'
#'\x10':'adapter',
#'view',
#'\x16':'metatype',
#'\x17':'property',
# '\x80' onwards are length embedded primitives
'\xfe':'third party extension',
'\xff':'internal reference',
}
FIXTURE = 128
EMBEDDED_INTEGER_START = FIXTURE
EMBEDDED_INTEGER_RANGE = 32
for i in xrange(EMBEDDED_INTEGER_RANGE):
type_id2string_map[chr(FIXTURE)] = 'integer -- %i' % i
FIXTURE += 1
EMBEDDED_BYTESTREAM_START = FIXTURE
EMBEDDED_BYTESTREAM_RANGE = 32
for i in xrange(1, EMBEDDED_BYTESTREAM_RANGE + 1):
type_id2string_map[chr(FIXTURE)] = 'bytestream -- %i' % i
FIXTURE += 1
EMBEDDED_SEQUENCE_START = FIXTURE
EMBEDDED_SEQUENCE_RANGE = 16
for i in xrange(1, EMBEDDED_SEQUENCE_RANGE + 1):
type_id2string_map[chr(FIXTURE)] = 'sequence -- %i' % i
FIXTURE += 1
EMBEDDED_MAPPING_START = FIXTURE
EMBEDDED_MAPPING_RANGE = 16
for i in xrange(1, EMBEDDED_MAPPING_RANGE + 1):
type_id2string_map[chr(FIXTURE)] = 'mapping -- %i' % i
FIXTURE += 1
EMBEDDED_SET_START = FIXTURE
EMBEDDED_SET_RANGE = 30
for i in xrange(1, EMBEDDED_SET_RANGE + 1):
type_id2string_map[chr(FIXTURE)] = 'set -- %i' % i
FIXTURE += 1
type_string2id_map = invert_dict(type_id2string_map)
NullType = Null.__class__
NoneType = None.__class__
plex_type_map = serialisation_map = {}
deserialisation_map = {}
# version identifier
# ------------------------------------------------------------------------------
# error klasses
# ------------------------------------------------------------------------------
class SerialisationError(Exception):
"""Error raised when it is not possible to serialise a given value."""
class DeserialisationError(Exception):
"""Error raised when it is not possible to deserialise a given value."""
# ------------------------------------------------------------------------------
# kache it babeh!
# ------------------------------------------------------------------------------
# kache could take akount of length of serialised output ...
serialisation_cache = {}
class NullCache:
def __init__(self, *args, **kwargs):
pass
def setdefault(self, key, default=None):
return default
def clear_all_cache():
for cache in serialisation_cache.values():
cache.clear()
def print_cache_usage():
print
for name, cache in serialisation_cache.iteritems():
print "%15s : %s" % (name, cache.get_cache_byte_size())
# ------------------------------------------------------------------------------
# some utility funktions
# ------------------------------------------------------------------------------
def register_serialiser(plex_type, type, cache=True, cache_size=1000):
def _register_serialiser(serialiser):
serialisation_map[type] = (plex_type, serialiser, cache)
if plex_type not in serialisation_cache:
serialisation_cache[plex_type] = CachingDict(cache_size)
return optimise()(serialiser)
return _register_serialiser
def register_deserialiser(plex_type):
def _register_deserialiser(deserialiser):
deserialisation_map[type_string2id_map[plex_type]] = deserialiser
return optimise()(deserialiser)
return _register_deserialiser
# ------------------------------------------------------------------------------
# type finder
# ------------------------------------------------------------------------------
@optimise()
def get_plex_type(object):
object_type = type(object)
# @/@ whilst during s11n, it is treated as 2 distinct types, in reality
# @/@ there is only one ``integer`` type ?
# if object_type in baseinteger:
# if object >= 0:
# return 'integer'
# else:
# return 'negative integer'
try:
return plex_type_map[object_type][0]
except:
raise ValueError("Object type %r not supported." % object_type)
# ------------------------------------------------------------------------------
# grouped type mappings -- used in ``GroupedDict``
# ------------------------------------------------------------------------------
similar_types = {}
for type_string in type_string2id_map:
similar_types[type_string] = type_string.split('--')[0].strip()
# ------------------------------------------------------------------------------
# kould be optimised further -- esp. wrt long integers
# ------------------------------------------------------------------------------
_packed_integer_cache = CachingDict(10000)
@optimise()
def pack_integer(integer, stream=Blank, retval=False):
if integer < 0:
raise SerialisationError("Negative integers cannot be packed.")
if integer in _packed_integer_cache:
if stream is Blank:
return _packed_integer_cache[integer]
else:
return stream.write(_packed_integer_cache[integer])
if stream is Blank:
stream = StringIO()
retval = True
write = stream.write
original_integer = integer
while 1:
left_bits = integer & 127
integer >>= 7
if integer:
left_bits += 128
write(chr(left_bits))
if not integer:
break
if retval:
return _packed_integer_cache.setdefault(
original_integer, stream.getvalue()
)
def pack_integer(integer):
from cStringIO import StringIO
stream = StringIO()
write = stream.write
while 1:
left_bits = integer & 127
integer >>= 7
if integer:
left_bits += 128
write(chr(left_bits))
if not integer:
break
return stream.getvalue()
def pack_integer2(integer):
from cStringIO import StringIO
stream = StringIO()
write = stream.write
while 1:
shift = integer >> 8
left_bits = integer & 256
integer >>= 8
if integer:
left_bits += 256
write(chr(left_bits))
if not integer:
break
return stream.getvalue()
@optimise()
def unpack_integer(stream):
if isinstance(stream, basestring):
stream = StringIO(stream)
read = stream.read
result = bit_shift = 0
lowest_byte = 1
while lowest_byte:
value = ord(read(1)) # @/@ b0rks on no input
lowest_byte = value & 128
result += (value & 127) << bit_shift
bit_shift += 7
return result
# ------------------------------------------------------------------------------
# our serialisers
# ------------------------------------------------------------------------------
@register_serialiser('unit', Unit)
def serialise_unit(object, context):
# units
return context.write('\x00' + object)
@register_serialiser('uuid', UUID)
def serialise_uuid(object, context):
return context.write('\x19' + object)
@register_serialiser('bytestream', str)
def serialise_bytestream(object, context):
# bytes -- raw strings
stream_length = len(object)
if 0 < stream_length <= EMBEDDED_BYTESTREAM_RANGE:
return context.write(
chr(EMBEDDED_BYTESTREAM_START - 1 + stream_length) + object
)
return context.write('\x01' + pack_integer(stream_length) + object)
@register_serialiser('text', unicode)
def serialise_text_via_unicode(object, context):
# texts -- encoded strings
object = object.encode('utf-8')
# object = object.encode('raw-unicode-escape')
return context.write('\x02\x00' + pack_integer(len(object)) + object)
@register_serialiser('text', Text)
def serialise_text_via_text(object, context):
# texts -- encoded strings part deux
# object = object.encode('raw-unicode-escape')
# return '\x02\x00' + pack_integer(len(object)) + object
encoding = object.encoding
object = object.value.encode(encoding)
if encoding.lower() == 'utf-8':
encoding_header = '\x02\x00'
else:
encoding_header = '\x02' + pack_integer(len(encoding)) + encoding
return context.write(encoding_header + pack_integer(len(object)) + object)
@register_serialiser('boolean', bool)
def serialise_boolean_via_bool(object, context):
# booleans -- True/False
if object:
return context.write('\x31')
return context.write('\x30')
@register_serialiser('boolean', Maybe)
def serialise_boolean_via_maybe(object, context):
# booleans -- Maybe
return context.write('\x3f' + pack_integer(object))
@register_serialiser('integer', int, cache_size=10000)
@register_serialiser('integer', long, cache_size=10000)
def serialise_integer(object, context):
# integers -- ints/longs
# positive ones
if object >= 0:
if object < EMBEDDED_INTEGER_RANGE:
return context.write(chr(EMBEDDED_INTEGER_START + object))
return context.write('\x04' + pack_integer(object))
# negative ones
return context.write('\x05' + pack_integer(abs(object)))
# object = encode_long(object)
# return '\x04' + pack_integer(len(object)) + object
@register_serialiser('decimal', decimal)
def serialise_decimal(object, context):
# decimals -- via actual decimals
object = str(object)
return context.write('\x06' + pack_integer(len(object)) + object)
@register_serialiser('float', float)
def serialise_float(object, context):
# floats -- via floats
object = repr(object)
return context.write('\x07' + pack_integer(len(object)) + object)
@register_serialiser('complex', complex)
def serialise_complex(object, context):
# complex numbers
return (
context.write('\x08') +
serialise_object((object.real, object.imag), context)
)
@register_serialiser('null', NullType)
@register_serialiser('null', NoneType)
def serialise_null(object, context):
# null -- None/Null
return context.write('\x0a')
@register_serialiser('storage', StorageItem)
def serialise_storage(object, context):
# storage items -- these are tuples so need to be done before sequences
return (
context.write('\x22') +
serialise_object(object[0], context) +
serialise_object(object[1], context)
)
@register_serialiser('sequence', tuple, cache=False)
def serialise_sequence(object, context):
# sequences -- ordered multisets (tuples)
# @/@ following the baach-palmer law, ignoring rekursive tuples for now
sequence_length = len(object)
if 0 < sequence_length <= EMBEDDED_SEQUENCE_RANGE:
return context.write(
chr(EMBEDDED_SEQUENCE_START - 1 + sequence_length)
) + ''.join(
serialise_object(element, context) for element in object
)
return context.write('\x0b' + pack_integer(sequence_length)) + ''.join(
serialise_object(element, context) for element in object
)
@register_serialiser('set', set, cache=False)
@register_serialiser('set', frozenset)
def serialise_set(object, context):
# sets
sequence_length = len(object)
if 0 < sequence_length <= EMBEDDED_SET_RANGE:
return context.write(
chr(EMBEDDED_SET_START - 1 + sequence_length)
) + ''.join(
serialise_object(element, context) for element in object
)
return context.write('\x0c' + pack_integer(sequence_length)) + ''.join(
serialise_object(element, context) for element in object
)
@register_serialiser('list', list, cache=False)
def serialise_list(object, context):
# lists -- ordered mutable multisets (lists)
return context.write('\x0e' + pack_integer(len(object))) + ''.join(
serialise_object(element, context) for element in object
)
@register_serialiser('mapping', dict, cache=False)
@register_serialiser('mapping', mapping)
def serialise_mapping(object, context):
# mappings -- dicts
sequence_length = len(object)
if 0 < sequence_length <= EMBEDDED_MAPPING_RANGE:
return context.write(
chr(EMBEDDED_MAPPING_START - 1 + sequence_length)
) + ''.join(
serialise_object(key, context) + serialise_object(value, context)
for key, value in object.iteritems()
)
return context.write('\x0f' + pack_integer(sequence_length)) + ''.join(
serialise_object(key, context) + serialise_object(value, context)
for key, value in object.iteritems()
)
@register_serialiser('datetime', datetime)
def serialise_datetime(object, context):
# datetime
if object.tzname():
# @/@ handle timezeone info
pass
timezone = '\x00' # assume UTC
return context.write(
'\x10' +
pack_integer(object.year) +
pack_integer(object.month) +
pack_integer(object.day) +
pack_integer(object.hour) +
pack_integer(object.minute) +
pack_integer(object.second) +
pack_integer(object.microsecond) +
timezone
)
@register_serialiser('date', date)
def serialise_date(object, context):
# naive dates
return context.write('\x11' + pack_integer(object.toordinal()))
@register_serialiser('timedelta', timedelta)
def serialise_timedelta(object, context):
# time deltas
return (
context.write('\x13') +
# we do it this way since we might get negatives and easier like this
serialise_object(
(object.days, object.seconds, object.microseconds),
context
)
)
@register_serialiser('timestamp', TimeStamp)
def serialise_timestamp(object, context):
# 64-bit timestamps
return context.write('\x15' + object.raw())
@register_serialiser('index', Index)
def serialise_index(object, context):
# indexes
return (
context.write('\x20' + uuid2string(object.id)) +
serialise_object(object.property, context) +
serialise_object(object.value, context)
)
@register_serialiser('plexname', Plexname)
def serialise_plexname(object, context):
# plexnames
object = object.value.encode('utf-8')
return context.write('\x23' + pack_integer(len(object)) + object)
# ------------------------------------------------------------------------------
# our main serialiser which delegates to the others
# ------------------------------------------------------------------------------
class SerialisationContext:
__slots__ = (
'initial_position', 'current_position', 'object_cache', 'dirty', 'stack'
)
def __init__(self, stream):
self.current_position = self.initial_position = stream.tell()
self.object_cache = {}
self.stack = []
self.dirty = []
def write(self, value):
self.current_position += len(value)
return value
def append_to_dirty_stack(self, value):
self.stack.append(value)
if value:
self.dirty.append(True)
def pop_dirty_stack(self):
last = self.stack.pop()
if last:
self.dirty.pop()
UNCACHED_TYPES = (int, NoneType, NullType)
@optimise()
def serialise_object(object, context):
object_type = type(object)
if object_type not in serialisation_map:
del context.object_cache # does this help ?
raise SerialisationError("Object type %r not supported." % object_type)
# use dat objekt kache!
object_id = id(object)
object_cache = context.object_cache
# we adapt this helpful trik from the piklers
# unfortunately repeated Nulls take up an extra byte and is similarly not
# worthwhile for small integers
# if (object_type not in UNCACHED_TYPES) and (object_id in object_cache):
# but, it doesn't seem worthwhile to ignore them ...
if object_id in object_cache:
context.append_to_dirty_stack(True)
return context.write('\xff' + pack_integer(object_cache[object_id]))
context.append_to_dirty_stack(False)
object_cache[object_id] = (
context.current_position - context.initial_position
# object strong ref not kept -- wise ?
)
del object_id, object_cache # does this help either ?
# get info on the objekt type
plex_type, serialiser, cachable = serialisation_map[type(object)]
cache = serialisation_cache[plex_type]
# do we have a kached serialisation ?
if cachable and object in cache:
return context.write(cache[object])
output = serialiser(object, context)
if cachable and not context.dirty:
cache[object] = output
context.pop_dirty_stack()
return output
@optimise()
def serialise_plex_type(object, stream=Blank, retval=False):
# set us up some params
if stream is Blank:
stream = StringIO()
retval = True
stream.write(serialise_object(object, SerialisationContext(stream)))
stream.flush()
if retval:
return stream.getvalue()
# ------------------------------------------------------------------------------
# our deserialisers
# ------------------------------------------------------------------------------
@register_deserialiser('internal reference')
def deserialise_internal_reference(context):
return context.object_cache[context.read()]
@register_deserialiser('unit')
def deserialise_unit(context):
return Unit(context.read(16))
@register_deserialiser('uuid')
def deserialise_uuid(context):
return UUID(context.read(16))
@register_deserialiser('bytestream')
def deserialise_bytestream(context):
return context.read(unpack_integer(context))
def create_embedded_bytestream_deserialisers():
def _deserialiser(i):
def deserialise_embedded_bytestream(context):
return context.read(i)
return deserialise_embedded_bytestream
for i in xrange(EMBEDDED_BYTESTREAM_RANGE):
deserialisation_map[
chr(EMBEDDED_BYTESTREAM_START + i)
] = _deserialiser(i + 1)
create_embedded_bytestream_deserialisers()
@register_deserialiser('text')
def deserialise_text(context):
encoding_bytesize = unpack_integer(context)
if encoding_bytesize:
encoding = context.read(encoding_bytesize)
else:
encoding = 'utf-8' # 'raw-unicode-escape'
bytesize = unpack_integer(context)
return Text(context.read(bytesize), encoding) # unicode
@register_deserialiser('boolean -- true')
def deserialise_boolean_true(context):
return True
@register_deserialiser('boolean -- false')
def deserialise_boolean_false(context):
return False
@register_deserialiser('boolean -- maybe')
def deserialise_boolean_maybe(context):
return Maybe(unpack_integer(context))
@register_deserialiser('integer')
def deserialise_integer(context):
return unpack_integer(context)
# bytesize = unpack_integer(context)
# if bytesize == 0:
# return 0
# data = context.read(bytesize)
# n = long(hexlify(data[::-1]), 16)
# if data[-1] >= '\x80':
# n -= 1L << (bytesize * 8)
# return n
def create_embedded_integer_deserialisers():
def _deserialiser(i):
def deserialise_embedded_integer(context):
return i
return deserialise_embedded_integer
for i in xrange(EMBEDDED_INTEGER_RANGE):
deserialisation_map[chr(EMBEDDED_INTEGER_START + i)] = _deserialiser(i)
create_embedded_integer_deserialisers()
@register_deserialiser('integer -- negative')
def deserialise_negative_integer(context):
return -unpack_integer(context)
@register_deserialiser('decimal')
def deserialise_decimal(context):
return Decimal(context.read(unpack_integer(context)))
@register_deserialiser('float')
def deserialise_float(context):
return float(context.read(unpack_integer(context)))
@register_deserialiser('complex')
def deserialise_complex(context):
return complex(*deserialise_object(context))
@register_deserialiser('null')
def deserialise_null(context):
return Null
@register_deserialiser('storage')
def deserialise_storage(context):
return StorageItem(
deserialise_object(context), deserialise_object(context)
)
@register_deserialiser('set')
def deserialise_set(context, size=Blank):
output = set()
add_to_output = output.add
if size is Blank:
size = unpack_integer(context)
for i in repeat(0, size):
obj = deserialise_object(context)
try:
add_to_output(obj)
except TypeError:
add_to_output(freeze(obj))
return output # frozenset(output)
def create_embedded_set_deserialisers():
def _deserialiser(size):
def deserialiser(context):
return deserialise_set(context, size)
return deserialiser
for i in xrange(EMBEDDED_SET_RANGE):
deserialisation_map[chr(EMBEDDED_SET_START+i)] = _deserialiser(i+1)
create_embedded_set_deserialisers()
@register_deserialiser('list')
def deserialise_list(context, size=Blank):
output = context.put([])
append_to_output = output.append
if size is Blank:
size = unpack_integer(context)
for i in repeat(0, size):
append_to_output(deserialise_object(context))
return output
@register_deserialiser('sequence')
def deserialise_sequence(context):
return tuple(deserialise_list(context))
def create_embedded_sequence_deserialisers():
def _deserialiser(size):
def deserialiser(context):
return tuple(deserialise_list(context, size))
return deserialiser
for i in xrange(EMBEDDED_SEQUENCE_RANGE):
deserialisation_map[chr(EMBEDDED_SEQUENCE_START+i)] = _deserialiser(i+1)
create_embedded_sequence_deserialisers()
@register_deserialiser('mapping')
def deserialise_mapping(context, size=Blank):
output = context.put({})
if size is Blank:
size = unpack_integer(context)
for i in repeat(0, size):
key = deserialise_object(context)
value = deserialise_object(context)
try:
output[key] = value
except TypeError:
output[freeze(key)] = value
return output # mapping(output)
def create_embedded_mapping_deserialisers():
def _deserialiser(size):
def deserialiser(context):
return deserialise_mapping(context, size)
return deserialiser
for i in xrange(EMBEDDED_MAPPING_RANGE):
deserialisation_map[chr(EMBEDDED_MAPPING_START+i)] = _deserialiser(i+1)
create_embedded_mapping_deserialisers()
@register_deserialiser('datetime')
def deserialise_datetime(context):
new_date = datetime(
unpack_integer(context), unpack_integer(context),
unpack_integer(context), unpack_integer(context),
unpack_integer(context), unpack_integer(context),
unpack_integer(context)
)
timezone = unpack_integer(context)
if not timezone:
timezone = 'UTC'
return new_date
@register_deserialiser('date')
def deserialise_date(context):
ordinal = unpack_integer(context)
return date.fromordinal(ordinal)
@register_deserialiser('timedelta')
def deserialise_timedelta(context):
return timedelta(*deserialise_object(context))
@register_deserialiser('timestamp')
def deserialise_timestampe(context):
return timestamp(context.read(8))
@register_deserialiser('index')
def deserialise_index(context):
index_id = context.read(16)
# note: you should set ``index.entity`` on the returned object
return Index(
property=deserialise_object(context),
value=deserialise_object(context),
id=index_id,
type=index_id
)
@register_deserialiser('plexname')
def deserialise_plexname(context):
return unicode(context.read(unpack_integer(context)), 'utf-8')
# ------------------------------------------------------------------------------
# and the main deserialiser which also delegates ...
# ------------------------------------------------------------------------------
class DeserialisationContext:
"""
>>> s = StringIO('test')
>>> d = DeserialisationContext(s)
"""
__slots__ = 'initial_position', 'object_cache', 'read', 'tell'
def __init__(self, stream):
self.initial_position = stream.tell()
self.object_cache = {}
self.read = stream.read
self.tell = stream.tell
def put(self, value):
return self.object_cache.setdefault(self.tell() - 1, value)
@optimise()
def deserialise_object(context, **kwargs): # @/@ why do i have **kwargs here?
type_id = context.read(1)
if (not type_id) or type_id not in deserialisation_map:
raise DeserialisationError(
"Unsupported type identifier %r found." % type_id
)
if type_id == '\xff':
return context.object_cache[
context.initial_position + unpack_integer(context)
]
deserialiser = deserialisation_map[type_id]
start_position = context.tell() - 1
output = context.object_cache[start_position] = deserialiser(context)
return output
@optimise()
def deserialise_plex_type(stream):
if isinstance(stream, basestring):
stream = StringIO(stream)
context = DeserialisationContext(stream)
return deserialise_object(context)
# ------------------------------------------------------------------------------
# fuzz test data
# ------------------------------------------------------------------------------
def generate_test_of_size(n, only_safe=False):
"""Creates evil test sequences."""
import sys
from random import randint, shuffle, choice, random
types = {
'unit': Unit,
'uuid': UUID,
'boolean': lambda : bool(randint(0, 1)),
'boolean-maybe': lambda : Maybe(randint(0, 100)),
'integer': lambda : randint(0, sys.maxint),
'negative integer': lambda : -randint(0, sys.maxint),
'float': lambda : randint(0, sys.maxint) + random(),
'decimal': lambda : Decimal(str(randint(0, sys.maxint) + random())),
'bytestream': lambda : ''.join(
chr(randint(0, 255)) for i in xrange(randint(1, 100))
),
'text': lambda : u''.join(
# 65500
unichr(randint(0, 3000)) for i in xrange(randint(1, 1000))
),
'null': Null,
'sequence': lambda : tuple(generate_test_of_size(n / 10)),
#'list': lambda : list(generate_test_of_size(n / 10)),
'set': lambda : frozenset(generate_test_of_size(n / 10, True)),
'mapping': lambda : mapping((
(list(generate_test_of_size(1, True))[0], list(generate_test_of_size(1))[0])
for i in xrange(randint(0, n / 10))
))
}
type_keys = types.keys()
safe_type_keys = type_keys[:]
#safe_type_keys.remove('list')
#safe_type_keys.remove('mapping')
if only_safe:
type_keys = safe_type_keys
for i in xrange(n):
current_choice = choice(type_keys)
yield types[current_choice]()
if __name__ == '__main__':
TEST_VALUE = (
1000,-1000,34444444444444444444444444444444444444444,
-33333333333333333333333333333334444444444444,3.14,"test string",
-22.7332342,{1:2,3:"x"},(1,2,[3,4]),
)
TEST_VALUE2 = (
1000, -1000, 34410, UUID(), Unit(), True,
-3701, 3.14, "test string", False, Decimal("2.1"), UUID(),
-22.7332342, set([2, Unit()]), {1:2,3:"x"}, (1, 2, [3, 4]), Null, Unit(),
)
TEST_VALUE100 = TEST_VALUE * 100
TEST_VALUE2_100 = TEST_VALUE2 * 100
import pickle
def p_dumps(obj):
file = StringIO()
pickle.Pickler(file, protocol=2).dump(obj)
return file.getvalue()
def timeit(
dumps=serialise_plex_type, loads=deserialise_plex_type,
value=TEST_VALUE100, force=True
):
from time import clock
t1 = clock()
data = dumps(value)
t2 = clock()
try:
new_value = loads(data)
except:
print "ERRRROR"
return data
t3 = clock()
if force:
assert value == new_value
print "Dump:", t2 - t1
print "Load:", t3 - t2
print "Size:", len(data)
if force:
return t2-t1, t3-t2, len(data)
return new_value
# cPickle
# Dump: 0.05
# Load: 0.04
# Size: 15560
# pickle
# Dump: 0.23
# Load: 0.13
# Size: 15563
# 0.36
# plex -- after initial optimisation
# Dump: 0.91
# Load: 0.66
# Size: 10403
# 1.57
# new
# Dump: 0.37
# Load: 0.60
# Size: 10403
# 0.97
# new2
# Dump: 0.32
# Load: 0.16
# Size: 1911
# 0.48
# ------------------------------------------------------------------------------
# with 36 random runs with generate_test_of_size(100, True)
# In [15]: [(sum(x[i] for x in results) / 36.) for i in range(3)]
# with plex
# [1.0263888888888881, 0.33250000000000163, 30251.138888888891]
# with pickle
# [0.55250000000000432, 0.40888888888888492, 80048.277777777781]
# [0.53972222222222166, 0.41027777777777624, 76638.833333333328]
# with pickle -- proto 2
# dies not knowing how to revive ``mapping``
# ------------------------------------------------------------------------------
# 100 repeats with TEST_VALUE
# [dump, load, size]
# In [28]: [(sum(x[i] for x in plex)) for i in range(3)]
# Out[28]: [2.53, 1.03, 19100]
# Out[4]: [2.670000000000007, 1.0199999999999907, 19100]
# Out[11]: [0.83999999999999275, 0.77000000000000313, 12800]
# In [29]: [(sum(x[i] for x in pick)) for i in range(3)]
# Out[29]: [3.12, 1.78, 66800]
# In [30]: [(sum(x[i] for x in pick2)) for i in range(3)]
# Out[30]: [3.75, 1.53, 38100]
# ------------------------------------------------------------------------------
# transaction system
# - gather up all changes to an index during the course of a transaction
# - when transaction is "committed", then, commit changes to pod
# - if transaction is "aborted", then, revert changes to indexes
# - in a multi-transaction system, simply "retry" whenever a transaction
# overlaps with another. automate retry at the end of conflicting txns
# MRO and event order ?
# get_indexes_for_property('aliased_to', between())
# setattr()
# <1-byte-transaction-status><8-byte-transaction-id>
# C<index.uuid><property><value>
# A<index.uuid><number-of-units><unit[s]
# R<index.uuid><number-of-units><unit[s]>
# D<index.uuid>
# yield conflict / retry
# data structure to minimise conflict == ?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment