Last active
October 1, 2018 05:29
-
-
Save wallabra/ba2010f0242797d4a8ecd79535aec4ea to your computer and use it in GitHub Desktop.
High-level serialization interface.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import string | |
import random | |
try: | |
import simplejson as json | |
except ImportError: | |
import json | |
def make_id( | |
id_length=150, | |
characters=(string.digits + string.ascii_letters + "/_-,") | |
): | |
return ''.join(random.choice(characters) for _ in range(id_length)) | |
everything = {} | |
types = {} | |
class UnknownTypeError(BaseException): | |
def __init__(self, name): | |
self.name = name | |
def __str__(self): | |
return "Attempted to deserialize unknown type '{}'!".format(self.name) | |
class SerializableType(type): | |
def __init__(cls, name, bases, clsdict): | |
super().__init__(name, bases, clsdict) | |
types[name] = cls | |
class DiskSave(object): | |
def __init__(self, filename): | |
self.filename = filename | |
if os.path.isfile(filename): | |
with open(filename) as fp: | |
self.objects = load_snapshot(json.loads(fp.read())) | |
else: | |
self.objects = {} | |
def create(self, ser_type, *args, **kwargs): | |
res = ser_type(*args, **kwargs) | |
self.objects[res.id] = res | |
res.register_attr_listener(lambda _, _1, _2: self.update()) | |
self.update() | |
return res | |
def update(self): | |
with open(self.filename, 'w') as fp: | |
fp.write(json.dumps(make_snapshot(self.objects))) | |
def __getitem__(self, key): | |
return self.objects[key] | |
def add(self, serializable): | |
self.objects[serializable.id] = serializable | |
class Serializable(object, metaclass=SerializableType): | |
def __init__(self, *args, id=None, _json_init=False, **kwargs): | |
while True: | |
self.id = id or make_id() | |
if self.id not in everything or id is not None: | |
break | |
everything[self.id] = self | |
self._attr = {} | |
self._attr_listeners = [] | |
self._init_sentinel = None | |
if not _json_init: | |
self.on_init(*args, **kwargs) | |
def register_attr_listener(self, f): | |
self._attr_listeners.append(f) | |
def on_init(self, *args, **kwargs): | |
pass | |
def on_load(self, transient): | |
pass | |
def __setattr__(self, name, value): | |
super().__setattr__(name, value) | |
if hasattr(self, '_init_sentinel') and name != '_init_sentinel': | |
self._attr[name] = value | |
self.on_set_attr(name, value) | |
for f in self._attr_listeners: | |
f(self, name, value) | |
def on_set_attr(self, name, value): | |
pass | |
def transient_data(self): | |
return {} | |
def json_dict(self): | |
return { | |
"type": type(self).__name__, | |
"id": self.id, | |
"transient": self.transient_data(), | |
"attr": { | |
k: { | |
"serialized": is_serializable(v), | |
"content": ( | |
v.json_dict() if is_serializable(v) else v | |
), | |
} | |
for k, v in self._attr.items() | |
}, | |
} | |
def json_dict_refs(self): | |
return { | |
"type": type(self).__name__, | |
"id": self.id, | |
"transient": self.transient_data(), | |
"attr": { | |
k: { | |
"serialized": is_serializable(v), | |
"content": ( | |
v.id if is_serializable(v) else v | |
), | |
} | |
for k, v in self._attr.items() | |
}, | |
} | |
def json(self): | |
return json.dumps(self.json_dict()) | |
def dump(self, fp): | |
return json.dump(self._json_dict(), fp) | |
@classmethod | |
def load_dict(cls, data): | |
if data['type'] not in types: | |
raise UnknownTypeError(data['type']) | |
self = types[data['type']](id=data['id'], _json_init=True) | |
for k, v in data['attr'].items(): | |
if v['serialized']: | |
res = cls.load_dict(v['content']) | |
else: | |
res = v['content'] | |
setattr(self, k, res) | |
self.on_load(data['transient']) | |
return self | |
@classmethod | |
def loads(cls, s): | |
return cls.load_dict(json.loads(s)) | |
def make_snapshot(domain=None): | |
if domain is None: | |
domain = everything | |
return {k: v.json_dict_refs() for k, v in domain.items()} | |
def load_snapshot(snapshot): | |
res = {} | |
success = False | |
bad = [] | |
for _ in range(len(snapshot) ** 2): | |
done = 0 | |
bad = [] | |
for id, data in snapshot.items(): | |
if data['type'] not in types: | |
raise UnknownTypeError(data['type']) | |
can = True | |
for k, v in data['attr'].items(): | |
if v['serialized'] and v['content'] not in res: | |
bad.append(v['content']) | |
can = False | |
if not can: | |
continue | |
self = types[data['type']](id=data['id'], _json_init=True) | |
for k, v in data['attr'].items(): | |
if v['serialized']: | |
val = res[v['content']] | |
else: | |
val = v['content'] | |
setattr(self, k, val) | |
self.on_load(data['transient']) | |
res[id] = self | |
done += 1 | |
if done >= len(snapshot): | |
success = True | |
break | |
if not success: | |
raise ValueError( | |
"In snapshot with {} objects, the following {} IDs " | |
"were referenced but missing: {}".format( | |
len(snapshot), len(bad), ', '.join(bad) | |
) | |
) | |
return res | |
def is_serializable(obj): | |
return isinstance(obj, Serializable) | |
def func_serializable(id=None): | |
if id is None: | |
id = __name__ | |
def _wrapper(func): | |
class _FuncCls(Serializable): | |
def __init__(self, *args, **kwargs): | |
self._f = func | |
super().__init__(*args, **kwargs) | |
def __call__(self, *args, **kwargs): | |
return self._f(*args, **kwargs) | |
def __str__(self): | |
return "[serializable function {}]".format(func.__name__) | |
_FuncCls.__name__ = "[ID:{} FUNCTION:{}]".format(id, func.__name__) | |
types[_FuncCls.__name__] = _FuncCls | |
return _FuncCls() | |
return _wrapper | |
if __name__ == "__main__": | |
@func_serializable('SERIALIZATION_TEST') | |
def test_func(x): | |
return x * 100 | |
class SerializationTest(Serializable): | |
def on_init(self): | |
self.what = "Hurts" | |
self.func = test_func | |
def __str__(self): | |
return "{} {} {}%".format(self.id, self.what, self.func(10)) | |
class DependencyTest(Serializable): | |
def on_init(self, other): | |
self.other = other | |
def __str__(self): | |
return "[dep: {}]".format(self.other) | |
# Disk Saves | |
try: | |
d1 = DiskSave('test.json') | |
o1 = d1.add(test_func) | |
o2 = d1.create(SerializationTest, id="Love") | |
o3 = d1.create(SerializationTest, id="This Nail") | |
o4 = d1.create(DependencyTest, other=o3, id="I depend.") | |
print(len(d1.objects)) | |
d2 = DiskSave('test.json') | |
print(len(d2.objects)) | |
finally: | |
if os.path.isfile('test.json'): | |
os.unlink('test.json') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment