Created
November 5, 2013 14:41
-
-
Save davidscherer/7320007 to your computer and use it in GitHub Desktop.
newtypes prototype @601942430f40
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct, random | |
from bisect import bisect_left | |
class Types (object): | |
def __init__(self): | |
self.encoders = {} | |
self.decoders = {} | |
def encode_key(self, val): | |
enc = self.encoders.get(type(val)) | |
if enc: return enc.encode_ordered(val) | |
if hasattr(val, 'encode_ordered'): | |
return val.encode_ordered() | |
raise TypeError("Unsupported data type: " + str(type(val))) | |
def decode_key(self, encoded_key): | |
dec = self.decoders.get( encoded_key[0] ) | |
if dec: return dec.decode_ordered(encoded_key,0) | |
raise ValueError("Unknown type code: " + repr(encoded_key)) | |
def encode_value(self, val): | |
enc = self.encoders.get(type(val)) | |
if enc: return enc.encode_unordered(val) | |
if hasattr(val, 'encode_unordered'): | |
return val.encode_unordered() | |
raise TypeError("Unsupported data type: " + str(type(val))) | |
def decode_value(self, encoded_key): | |
dec = self.decoders.get( encoded_key[0] ) | |
if dec: return dec.decode_unordered(encoded_key,0) | |
raise ValueError("Unknown type code: " + repr(encoded_key)) | |
def encode_key_part(self, val): | |
enc = self.encoders.get(type(val)) | |
if enc: return enc.encode_ordered_selfterm(val) | |
if hasattr(val, 'encode_ordered_selfterm'): | |
return val.encode_ordered_selfterm() | |
raise TypeError("Unsupported data type: " + str(type(val))) | |
def decode_key_part(self, encoded_key, pos): | |
dec = self.decoders.get( encoded_key[pos] ) | |
if dec: return dec.decode_ordered_selfterm(encoded_key,pos) | |
raise ValueError("Unknown type code: " + repr(encoded_key[pos:])) | |
def encode_value_part(self, val): | |
enc = self.encoders.get(type(val)) | |
if enc: return enc.encode_unordered_selfterm(val) | |
if hasattr(val, 'encode_unordered_selfterm'): | |
return val.encode_unordered_selfterm() | |
raise TypeError("Unsupported data type: " + str(type(val))) | |
def decode_value_part(self, encoded_key, pos): | |
dec = self.decoders.get( encoded_key[pos] ) | |
if dec: return dec.decode_unordered_selfterm(encoded_key,pos) | |
raise ValueError("Unknown type code: " + repr(encoded_key[pos:])) | |
types = Types() | |
class TypeCodec (object): | |
# A codec provides four different encodings of a given type, and four corresponding decoders | |
# The four encodings are for the different combinations of [ordered,unordered] x [self-terminating, non-self-terminating] | |
# A minimal implementation only needs to override type, typecode, encode_ordered_selfterm and decode_ordered_selfterm, | |
# and the other three encodings will automatically be done identically. To save space, more compact | |
# encodings of some of the other encodings may be implemented. | |
typecode = None | |
type = None | |
def __init__(self, types=types): | |
self.typesystem = types | |
types.encoders[self.type] = self | |
types.decoders[self.typecode] = self | |
def encode_ordered_selfterm(self, v): | |
"""Returns a string s representing v such that: | |
s.startswith(self.typecode) and | |
self.decode_ordered_selfterm(s)==(v,len(s)) and | |
encode_ordered_selfterm(a)<encode_ordered_selfterm(b) == a<b""" | |
raise NotImplemented() | |
def decode_ordered_selfterm(self, v, pos): | |
raise NotImplemented() | |
def encode_ordered(self, v): | |
"""Returns a string s representing v such that: | |
s.startswith(self.typecode) and | |
self.decode_ordered(s,0) == v and | |
encode_ordered(a)<encode_ordered(b) == a<b""" | |
# This can be overridden for efficiency; the default is to use encode_ordered_selfterm | |
return self.encode_ordered_selfterm(v) | |
def decode_ordered(self, v, pos): | |
return self.decode_ordered_selfterm(v, pos)[0] | |
def encode_unordered_selfterm(self, v): | |
"""Returns a string s representing v such that: | |
s.startswith(self.typecode) and | |
self.decode_unordered_selfterm(s,0)==(v,len(s))""" | |
# This can be overridden for efficiency; the default is to use encode_ordered_selfterm | |
return self.encode_ordered_selfterm(v) | |
def decode_unordered_selfterm(self, v, pos): | |
return self.decode_ordered_selfterm(v, pos) | |
def encode_unordered(self, v): | |
"""Returns a string s representing v such that: | |
s.startswith(self.typecode()) and | |
self.decode_ordered(s,0) == v""" | |
# This can be overridden for efficiency; the default is to use encode_ordered | |
return self.encode_ordered(v) | |
def decode_unordered(self, v, pos): | |
return self.decode_ordered(v, pos) | |
class NoneCodec (TypeCodec): | |
type = type(None) | |
typecode = "\x00" | |
def encode_ordered_selfterm(self, v): | |
return "\x00" | |
def decode_ordered_selfterm(self, v, pos): | |
return None, pos+1 | |
NoneCodec() | |
# TODO: Tweak? | |
def _encodeUint7bit(l): | |
s = "" | |
while l >= 128: | |
s += chr( (l & 0x7f) | 0x80 ) | |
l >>= 7 | |
return s + chr( l ) | |
def _decodeUint7bit(v,pos): | |
l = 0 | |
p = 0 | |
while ord(v[pos]) >= 128: | |
l += (ord(v[pos])&0x7f) << p | |
p += 7 | |
pos += 1 | |
return l + (ord(v[pos]) << p), pos | |
class ByteStrCodec (TypeCodec): | |
type = bytes | |
typecode = "\x01" | |
def encode_ordered_selfterm(self, v): | |
return self.typecode + v.replace("\x00", "\x00\xFF") + "\x00" | |
def decode_ordered_selfterm(self, v, pos): | |
end = ByteStrCodec._find_terminator( v, pos+1 ) | |
return v[pos+1:end].replace("\x00\xff", "\x00"), end+1 | |
def encode_ordered(self, v): | |
return self.typecode + v | |
def decode_ordered(self, v, pos): | |
return v[pos+1:] | |
def encode_unordered_selfterm(self, v): | |
return self.typecode + _encodeUint7bit(len(v)) + v | |
def decode_unordered_selfterm(self, v, pos): | |
l, pos = _decodeUint7bit(v, pos+1) | |
return v[pos+1:pos+1+l], pos+1+l | |
@staticmethod | |
def _find_terminator( v, pos ): | |
# Finds the start of the next terminator [\x00]![\xff] or the end of v | |
while True: | |
pos = v.find('\x00', pos) | |
if pos < 0: | |
return len(v) | |
if pos+1 == len(v) or v[pos+1] != '\xff': | |
return pos | |
pos += 2 | |
ByteStrCodec() | |
class UnicodeCodec( ByteStrCodec ): | |
type = unicode | |
typecode = "\x02" | |
def encode_ordered_selfterm(self, v): | |
return ByteStrCodec.encode_ordered_selfterm(self, v.encode("utf-8")) | |
def decode_ordered_selfterm(self, v, pos): | |
s,end = ByteStrCodec.decode_ordered_selfterm(self, v, pos) | |
return s.decode("utf-8"), end | |
def encode_ordered(self, v): | |
return ByteStrCodec.encode_ordered(self, v.encode("utf-8")) | |
def decode_ordered(self, v, pos): | |
return ByteStrCodec.decode_ordered(self, v, pos).decode("utf-8") | |
# TODO: Use a different unordered selfterm encoding because UTF-8 will not normally have | |
# lots of nulls? | |
def encode_unordered_selfterm(self, v): | |
return ByteStrCodec.encode_unordered_selfterm(self, v.encode("utf-8")) | |
def decode_unordered_selfterm(self, v, pos): | |
s,end = ByteStrCodec.decode_unordered_selfterm(self, v, pos) | |
return s.decode("utf-8"), end | |
UnicodeCodec() | |
class IntegerCodec( TypeCodec ): | |
types = [int, long] | |
typecodes = range(12, 29) | |
_size_limits = tuple( (1 << (i*8))-1 for i in range(9) ) | |
def __init__(self, types): | |
for type in self.types: | |
types.encoders[type] = self | |
for code in self.typecodes: | |
types.decoders[chr(code)] = self | |
def encode_ordered_selfterm(self, v): | |
if v == 0: | |
return chr(20) | |
elif v > 0: | |
n = bisect_left( self._size_limits, v ) | |
return chr(20+n) + struct.pack(">Q",v)[-n:] | |
else: | |
n = bisect_left( self._size_limits, -v ) | |
maxv = self._size_limits[n] | |
return chr(20-n) + struct.pack(">Q", maxv+v)[-n:] | |
def decode_ordered_selfterm(self, v, pos): | |
n = ord(v[pos]) - 20 | |
if n >= 0: | |
end = pos+1+n | |
return struct.unpack(">Q", chr(0)*(8-n) + v[pos+1:end])[0], end | |
else: | |
end = pos+1-n | |
return struct.unpack(">Q", chr(0)*(8+n) + v[pos+1:end])[0]-self._size_limits[-n], end | |
IntegerCodec(types) | |
class TupleCodec( TypeCodec ): | |
type = tuple | |
typecode = "\x03" | |
endcode = "\x04" # Not *exactly* a type code, but has to be different from any valid type code and from 0xff | |
def encode_ordered_selfterm(self, v): | |
return self.typecode + "".join( self.typesystem.encode_key_part(p) for p in v ) + self.endcode | |
def decode_ordered_selfterm(self, v, pos): | |
result = [] | |
pos += 1 | |
while v[pos]!=self.endcode: | |
part, nextpos = self.typesystem.decode_key_part(v, pos) | |
result.append(part) | |
pos = nextpos | |
return tuple(result), pos+1 | |
def encode_ordered(self, v): | |
return self.typecode + "".join( self.typesystem.encode_key_part(p) for p in v ) | |
def decode_ordered(self, v, pos): | |
end = len(v) | |
result = [] | |
pos += 1 | |
while pos != end: | |
part, nextpos = self.typesystem.decode_key_part(v, pos) | |
result.append(part) | |
pos = nextpos | |
return tuple(result) | |
def encode_unordered_selfterm(self, v): | |
return self.typecode + "".join( self.typesystem.encode_value_part(p) for p in v ) + self.endcode | |
def decode_unordered_selfterm(self, v, pos): | |
result = [] | |
pos += 1 | |
while v[pos]!=self.endcode: | |
part, nextpos = self.typesystem.decode_value_part(v, pos) | |
result.append(part) | |
pos = nextpos | |
return tuple(result), pos+1 | |
def encode_unordered(self, v): | |
return self.typecode + "".join( self.typesystem.encode_value_part(p) for p in v ) | |
def decode_unordered(self, v, pos): | |
end = len(v) | |
result = [] | |
pos += 1 | |
while pos != end: | |
part, nextpos = self.typesystem.decode_value_part(v, pos) | |
result.append(part) | |
pos = nextpos | |
return tuple(result) | |
TupleCodec() | |
class ExtendedTypeCodec (TypeCodec): | |
# This is really only half a codec- it handles dispatching of multibyte type codes | |
# starting with 0x4f | |
typecode = "\x4f" | |
def _get_dec(self, v, pos): | |
e = pos+1 | |
while ord(v[e])>=128: | |
e+=1 | |
dec = self.typesystem.decoders.get(v[pos:e+1]) | |
if dec: return dec | |
raise ValueError("Unknown type code: " + repr(v)) | |
def decode_ordered_selfterm(self, v, pos): | |
return self._get_dec(v,pos).decode_ordered_selfterm(v,pos) | |
def decode_unordered_selfterm(self, v, pos): | |
return self._get_dec(v,pos).decode_unordered_selfterm(v,pos) | |
def decode_unordered(self, v, pos): | |
return self._get_dec(v,pos).decode_unordered(v,pos) | |
def decode_ordered(self, v, pos): | |
return self._get_dec(v,pos).decode_ordered(v,pos) | |
ExtendedTypeCodec() | |
##class Subspace (object): | |
## def __init__(self, tuplePrefix, types=types): | |
## self.tuplePrefix = tuplePrefix | |
## self.types = types | |
## def __getitem__(self, name): | |
## return Subspace( self.tuplePrefix + (name,), self.types ) | |
## def key(self): | |
## return self.types.encode_key( self.tuplePrefix ) | |
## def encode_key(self, key): | |
## return self.types.encode_key( self.tuplePrefix + key ) | |
## def decode_key(self, key): | |
## k = self.types.decode_key( key ) | |
## assert k[:len(self.tuplePrefix)] == self.tuplePrefix | |
## return k[ len(self.tuplePrefix): ] | |
## def encode_value(self, value): | |
## return self.types.encode_value(value) | |
## def decode_value(self, value): | |
## return self.types.decode_value(value) | |
## | |
##class Subspace2 (object): | |
## def __init__(self, rawPrefix, types=types): | |
## self.rawPrefix = rawPrefix | |
## self.types = types | |
## def encode_key( self, key ): | |
## return self.rawPrefix + self.types.encode_key(key) | |
## def decode_key( self, key ): | |
## assert key.startswith(self.rawPrefix) | |
## return self.types.decode_key(key[len(self.rawPrefix):]) | |
## def encode_value(self, value): | |
## return self.types.encode_value(value) | |
## def decode_value(self, value): | |
## return self.types.decode_value(value) | |
class Subspace (object): | |
def __init__(self, tuplePrefix, types=types, rawPrefix=""): | |
self.rawPrefix = rawPrefix | |
if tuplePrefix != None: | |
self.rawPrefix += types.encode_key(tuplePrefix) | |
self.types = types | |
def __getitem__(self, name): | |
return Subspace( None, self.types, self.encode_key( name ) ) | |
def key(self): | |
return self.rawPrefix | |
def encode_key( self, key ): | |
return self.rawPrefix + self.types.encode_key_part(key) | |
def decode_key( self, key ): | |
assert key.startswith(self.rawPrefix) | |
return self.types.decode_key_part( key, len(self.rawPrefix) )[0] | |
def encode_value(self, value): | |
return self.types.encode_value(value) | |
def decode_value(self, value): | |
return self.types.decode_value(value) | |
def test_enc(v): | |
print repr(v), repr(types.encode_key(v)), repr(types.encode_value(v)) | |
assert (types.decode_key(types.encode_key(v))==v and types.decode_value(types.encode_value(v))==v) | |
def test_random_int(): | |
return random.randrange( -(1<<64), 1<<64 ) | |
def test_random_edge_int(): | |
return random.choice((-1,+1)) * (1<<random.randrange(0,64)) + random.randrange(-5,5) | |
def test_random_bytes(): | |
if random.random() < 0.01: | |
return "\x00\xff" * 200 | |
chars = ['\x00', '\x01', 'a', '7', '\xfe', '\ff'] | |
return ''.join([random.choice(chars) for c in range(random.randint(0, 5))]) | |
def test_random_unicode(): | |
chars = [u'\x00', u'\x01', u'a', u'7', u'\xfe', u'\ff', u'\u0000', u'\u0001', u'\uffff', u'\uff00'] | |
return u''.join([random.choice(chars) for c in range(random.randint(0, 5))]) | |
def test_random_tuple(): | |
return tuple( test_random_value() for x in range(random.randint(0,4)) ) | |
def test_random_fruit(): | |
return Fruit( random.choice(["apple","banana"]), random.choice(["red","green","yellow"]), random.randint(0,300) ) | |
def test_random_value(): | |
return random.choice( [lambda: None, test_random_edge_int, test_random_int, test_random_bytes, test_random_unicode, test_random_tuple, test_random_fruit] )() | |
def test(): | |
for i in range(1000000): | |
test_enc(test_random_value()) | |
def user_typecode( v ): | |
# User type codes 0-14 come from reserved space 0x40-0x4e | |
if v<15: return chr(0x40+v) | |
# User type codes 15+ are 7-bit encoded | |
return "\x4f" + _encodeUint7bit(v-15) | |
class ExtendedType (object): | |
def encode_ordered(self): | |
return self.encode_ordered_selfterm() | |
def encode_unordered_selfterm(self): | |
return self.encode_ordered_selfterm() | |
def encode_unordered(self): | |
return self.encode_ordered() | |
@classmethod | |
def decode_ordered(cls, v, pos): | |
return cls.decode_ordered_selfterm(v, pos)[0] | |
@classmethod | |
def decode_unordered_selfterm(cls, v, pos): | |
return cls.decode_ordered_selfterm(v, pos) | |
@classmethod | |
def decode_unordered(cls, v, pos): | |
return cls.decode_ordered(v, pos) | |
class SimpleExtendedType (ExtendedType): | |
def encode_ordered_selfterm(self): | |
return self.typecode + types.encode_key_part( self.to_constructor_parameters() ) | |
@classmethod | |
def decode_ordered_selfterm(cls,s,pos): | |
pos += len(cls.typecode) | |
v,pos = types.decode_key_part(s,pos) | |
return cls( *v ), pos | |
class Fruit (ExtendedType): | |
def __init__(self, fruitType, color, weight): | |
self.fruitType = fruitType | |
self.color = color | |
self.weight = weight | |
def __repr__(self): | |
return "Fruit: %dg %s %s" % (self.weight, self.color, self.fruitType) | |
def __eq__(self, other): | |
return self.fruitType==other.fruitType and self.color==other.color and self.weight == other.weight | |
typecode = user_typecode(100) | |
def encode_ordered_selfterm(self): | |
return self.typecode + types.encode_key_part(self.fruitType) + types.encode_key_part(self.color) + types.encode_key_part(self.weight) | |
@classmethod | |
def decode_ordered_selfterm(cls,s,pos): | |
pos += len(cls.typecode) | |
a = [] | |
for i in range(3): | |
v,pos = types.decode_key_part(s,pos) | |
a.append(v) | |
return Fruit( *a ), pos | |
class UnitValue (SimpleExtendedType): | |
def __init__(self, value, units): | |
self.value = value | |
self.units = units | |
def __repr__(self): | |
return str(self.value) + str(self.units) | |
typecode = user_typecode(100000) | |
def to_constructor_parameters(self): | |
return (self.value, self.units) | |
types.decoders[UnitValue.typecode] = UnitValue | |
types.decoders[Fruit.typecode] = Fruit | |
#types.encoders[Fruit.typecode] = SelfEncoder | |
# Other possible types: | |
# float, double, extended-double | |
# decimal (arbitrary-precision fixed point) | |
# bigger ints | |
# timestamp | |
# uuid | |
def set(subspace, key, value): | |
print "SET", types.decode_key(subspace.encode_key(key)), ":=", types.decode_value(subspace.encode_value(value)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment