Created
August 10, 2018 21:03
-
-
Save nistath/8c9eaa861f96a1c3ef2d6fc13208dadc to your computer and use it in GitHub Desktop.
This code is somehow non-deterministic...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
import dataclasses | |
import types | |
from dataclasses import dataclass, field | |
from functools import wraps | |
from inspect import isclass | |
from typing import ClassVar, Set, Mapping, Callable, Dict, T | |
@dataclass | |
class RuleSet: | |
rules: Mapping[str, Mapping[str, Callable]] | |
_supported = ('pre', 'post') | |
_application_fmt = """\ | |
old = receiver.{function} | |
@wraps(old) | |
def new(*args, **kwargs): | |
{pre} | |
old(*args, **kwargs) | |
{post} | |
receiver.{function} = new | |
""" | |
# TODO: Consider a better model for this. | |
# This ensures that the receiver (class or instance) gets passed to the callback. | |
_default_callback_fmtdict = {fn: '' for fn in _supported} | |
_instance_callback_fmt = '{}(receiver, *args, {} **kwargs)' | |
_class_callback_fmt = '{}(*args, {} **kwargs)' | |
def apply(self, receiver, metadata=None): | |
''' | |
Apply this ruleset to `receiver`. | |
The callback must take [self, metadata, *args, **kwargs]. | |
''' | |
# HACK: Applicable to class or instance with the same callback signature. | |
if isclass(receiver): | |
callback_fmt = self._class_callback_fmt | |
else: | |
callback_fmt = self._instance_callback_fmt | |
for fn in self.rules: | |
callbacks = self._default_callback_fmtdict.copy() | |
for rule in self.rules[fn]: | |
metadata_part = 'metadata,' if metadata is not None else '' | |
callbacks[rule] = callback_fmt.format(rule, metadata_part) | |
ruleset_application = self._application_fmt.format(function=fn, **callbacks) | |
namespace = dict( | |
__name__='RuleSet_apply_{}'.format(fn), | |
receiver=receiver, | |
wraps=wraps, | |
metadata=metadata, | |
**self.rules[fn], | |
) | |
exec(ruleset_application, namespace) | |
return receiver | |
class Plural(Dict[str, T]): | |
attributes: ClassVar[Set[str]] | |
main: str = field(default=None, repr=False, hash=False) | |
def __init__(self, init=None): | |
if not hasattr(self, 'attributes'): | |
raise AttributeError('attributes must be specified through make') | |
self._store = {attrnm: {} for attrnm in self.attributes} | |
if init: | |
self.extend(init) | |
@classmethod | |
def make(cls, name, attributes, main=None): | |
@classmethod | |
def _disable_make(cls, name, attributes, main=None): | |
raise TypeError('deep inheritance not necessary') | |
attributes = set(attributes) | |
if main is not None and main not in attributes: | |
raise ValueError('main argument is not in the attributes set') | |
nspace = dict(attributes=attributes, main=main, make=_disable_make) | |
return types.new_class(name, (cls,), {}, lambda ns: ns.update(nspace)) | |
def add(self, item: T, safe=False): | |
''' | |
Add `item` to the internal representation. | |
Will raise ValueError if `safe` and there exists an attribute conflict. | |
''' | |
removal = None | |
remattr = None | |
for attrnm in self.attributes: | |
attr = getattr(item, attrnm) | |
if attr in self._store[attrnm]: | |
removal = self._store[attrnm][attr] | |
remattr = attrnm | |
self.remove(removal) | |
if safe: | |
raise ValueError(f'{item} and {removal} have equal ' | |
f'{remattr!r} attributes') | |
self._store[attrnm][attr] = item | |
return None | |
def extend(self, iterable, **kwargs): | |
for val in iterable: | |
self.add(val, **kwargs) | |
def remove(self, item: T): | |
for attrnm in self.attributes: | |
attr = getattr(item, attrnm) | |
if attr in self._store[attrnm]: | |
del self._store[attrnm][attr] | |
def clean(self): | |
del self._store | |
self.__init__() | |
def keys(self, by=None): | |
if by is None and not self.main: | |
raise KeyError('by keyword argument nor self.main are defined') | |
by = by or self.main | |
return self._store[by].keys() | |
def values(self, by=None): | |
if by is None and not self.main: | |
raise KeyError('by keyword argument nor self.main are defined') | |
by = by or self.main | |
return self._store[by].values() | |
def items(self, by=None): | |
if by is None and not self.main: | |
raise KeyError('by keyword argument nor self.main are defined') | |
by = by or self.main | |
return self._store[by].items() | |
def __bool__(self): | |
return bool(self.values()) | |
def __iter__(self): | |
return iter(self.values()) | |
def __getitem__(self, attrnm: str): | |
if attrnm not in self.attributes: | |
raise KeyError(f'there is no mapping by {attrnm}') | |
return types.MappingProxyType(self._store[attrnm]) | |
def __len__(self): | |
return len(next(iter(self._store.values()))) | |
def __contains__(self, item): | |
''' | |
True if the exact instance of `item` is in `self`, | |
False otherwise. | |
''' | |
# Use an assignment expression and a condition on hasattr | |
# s.t. this also works in cases when item is None | |
return any(self._store[attrnm].get(getattr(item, attrnm), None) is item | |
for attrnm in self.attributes) | |
def __repr__(self): | |
name = type(self).__name__ | |
commasep = ', '.join(map(repr, self.values())) | |
args = f'[{commasep}]' if len(self) else '' | |
return f'{name}({args})' | |
class Unique(Plural[T]): | |
def add(self, *args, **kwargs): | |
super().add(*args, safe=True, **kwargs) | |
def values(self): | |
return next(iter(self._store.values())).values() | |
# def asdict(obj, *, dict_factory=dict): | |
# if dataclasses._is_dataclass_instance(obj): | |
# return asdict(obj, dict_factory) | |
# elif isinstance(obj, (list, tuple)): | |
# return type(obj)(asdict(v, dict_factory) for v in obj) | |
# elif isinstance(obj, dict): | |
# return type(obj)((asdict(k, dict_factory), asdict(v, dict_factory)) | |
# for k, v in obj.items()) | |
# else: | |
# return copy.deepcopy(obj) | |
if __name__ == '__main__': | |
@dataclass | |
class Enumeration: | |
name: str | |
value: int | |
ManyEnum = Unique[Enumeration].make('ManyEnum', ('name', 'value'), 'name') | |
a = Enumeration('w', 2) | |
b = Enumeration('1', 4) | |
c = Enumeration('q', 4) | |
print(ManyEnum.attributes) | |
container = ManyEnum([a]) | |
def post_add(instance, item, metadata=None, **kwargs): | |
print('Added {} to {} with {}!'.format(item, instance, kwargs)) | |
print('Metadata is {}.'.format(metadata)) | |
print('\n\n') | |
ruleset = RuleSet({'add': {'post': post_add}}) | |
# Apply to all Container_class instances | |
ruleset.apply(ManyEnum) | |
new_container = ManyEnum([a]) | |
# Apply once more to container instance | |
# Expect a dual printout for adds on container instance | |
ruleset.apply(container, metadata=34) | |
container.add(b) | |
try: | |
container.add(c) | |
except ValueError as e: | |
print('Error is:', e) | |
print('\n\n') | |
for k, v in container.items(): | |
print(k, v) | |
print(len(container)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment