Created
February 19, 2016 15:32
-
-
Save Diggsey/02b7ba4cefc00a2c7fde to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy | |
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta | |
class OwningContext(object): | |
__slots__ = ('instance', 'prefix') | |
def __init__(self, instance, prefix): | |
self.instance = instance | |
self.prefix = prefix | |
def field_name(self, key): | |
return self.prefix + key | |
def set_(self, key, value): | |
setattr(self.instance, self.field_name(key), value) | |
def get_(self, key): | |
return getattr(self.instance, self.field_name(key)) | |
def del_(self, key): | |
delattr(self.instance, self.field_name(key)) | |
class CompositeColumn(object): | |
def __init__(self, embedded, separator = '_'): | |
self.embedded = embedded | |
self.separator = separator | |
def get_prefix(self, key): | |
return key + self.separator | |
class EmbeddedAttribute(object): | |
def __init__(self, owner, key, column): | |
self._owner = owner | |
self._key = key | |
self._column = column | |
if not isinstance(owner, EmbeddedMeta): | |
EmbeddedAttribute.annotate_obj(self, column.embedded._fields) | |
def _resolve(self, k): | |
suffix = self._column.get_prefix(self._key) + k | |
if isinstance(self._owner, EmbeddedAttribute): | |
return self._owner._resolve(suffix) | |
else: | |
return getattr(self._owner, suffix) | |
def __get__(self, instance, owner=None): | |
if instance is None: | |
return self | |
if self._key not in instance.__dict__: | |
owning_context = OwningContext(instance, self._column.get_prefix(self._key)) | |
instance.__dict__[self._key] = self._column.embedded(_owning_context=owning_context) | |
return instance.__dict__[self._key] | |
def __set__(self, instance, value): | |
assignee = self.__get__(instance) | |
for f in self._column.embedded._fields: | |
setattr(assignee, f, getattr(value, f)) | |
def __getattr__(self, key): | |
return self._resolve(key) | |
@staticmethod | |
def annotate_obj(obj, dict_): | |
for k, v in dict_.iteritems(): | |
if isinstance(v, CompositeColumn): | |
setattr(obj, k, EmbeddedAttribute(obj, k, v)) | |
class EmbeddedMeta(type): | |
def __new__(mcs, classname, bases, dict_): | |
def is_field(v): | |
return isinstance(v, sqlalchemy.Column) or isinstance(v, CompositeColumn) | |
fields = {k: v for k, v in dict_.iteritems() if is_field(v)} | |
for k, v in fields.iteritems(): | |
del dict_[k] | |
cls = type.__new__(mcs, classname, bases, dict_) | |
cls._fields = fields | |
cls._flat_fields = {} | |
cls._flatten(cls._flat_fields) | |
EmbeddedAttribute.annotate_obj(cls, fields) | |
return cls | |
class EmbeddedBase(object): | |
__metaclass__ = EmbeddedMeta | |
def __init__(self, _owning_context=None, **kwargs): | |
self.__dict__['_owning_context'] = _owning_context | |
object.__init__(self) | |
for k, v in kwargs.iteritems(): | |
setattr(self, k, v) | |
def __setattr__(self, key, value): | |
descriptor = getattr(type(self), key, None) | |
if hasattr(descriptor, '__set__'): | |
descriptor.__set__(self, value) | |
elif self._owning_context is not None: | |
self._owning_context.set_(key, value) | |
elif key in self._flat_fields: | |
object.__setattr__(self, key, value) | |
else: | |
raise AttributeError(key) | |
def __getattr__(self, key): | |
if self._owning_context is not None: | |
return self._owning_context.get_(key) | |
elif key in self._flat_fields: | |
return None | |
else: | |
raise AttributeError(key) | |
def __delattr__(self, key): | |
if self._owning_context is None: | |
object.__delattr__(self, key) | |
else: | |
self._owning_context.del_(key) | |
@classmethod | |
def __dir__(cls): | |
return cls._fields.keys() | |
@classmethod | |
def _flatten(cls, dict_, prefix = ''): | |
for k, v in cls._fields.iteritems(): | |
field = prefix + k | |
if isinstance(v, CompositeColumn): | |
v.embedded._flatten(dict_, v.get_prefix(field)) | |
else: | |
dict_[field] = v.copy() | |
class CustomMeta(DeclarativeMeta): | |
def __new__(mcs, classname, bases, dict_): | |
composites = {k: v for k, v in dict_.iteritems() if isinstance(v, CompositeColumn)} | |
for k, v in composites.iteritems(): | |
v = dict_.pop(k) | |
v.embedded._flatten(dict_, v.get_prefix(k)) | |
cls = DeclarativeMeta.__new__(mcs, classname, bases, dict_) | |
EmbeddedAttribute.annotate_obj(cls, composites) | |
return cls | |
Base = declarative_base(metaclass=CustomMeta) | |
class EmbeddedName(EmbeddedBase): | |
first = sqlalchemy.Column(sqlalchemy.String) | |
last = sqlalchemy.Column(sqlalchemy.String) | |
class EmbeddedPerson(EmbeddedBase): | |
name = CompositeColumn(EmbeddedName, separator='-') | |
age = sqlalchemy.Column(sqlalchemy.Integer) | |
class Test(Base): | |
__tablename__ = 'test' | |
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) | |
person_a = CompositeColumn(EmbeddedPerson) | |
person_b = CompositeColumn(EmbeddedPerson) | |
x = Test() | |
y = EmbeddedPerson(age=2) | |
y.name = EmbeddedName(first="hello") | |
x.person_a = y |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment