Created
July 17, 2020 20:52
-
-
Save snarkmaster/e98f86c73f59e3070f8be01c4e53248b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
`deepfrozen`: Recursively immutable types that support inheritance. | |
- Unlike frozen `dataclass`es or `NamedTuple`s, this lets you make | |
types recursively immutable. | |
- Unlike `dataclass` the immutability guarantee is strong, you cannot | |
bypass it via `setattr`. | |
- Unlike `NamedTuple` this supports inheritance. | |
- Unlike `PyRecord`, this is immutable and hashable. | |
To get a sense of the API, read the "Demo session" below, and the tests. | |
== ORPHAN CODE == | |
This is an old "on the side" hack of mine. I built it because I wanted to | |
use immutable types in my projects. I gave up because productionizing it | |
was harder than just being disciplined about using `NamedTuple`s. | |
The things that are missing to make this real: | |
- Design / API review by experienced people who are not me. | |
- More thorough tests. | |
- Integration with `mypy` or `Pyre`. I assessed feasibility, and it | |
was not too hard for `mypy`, but involved maintaining a plugin. For | |
Pyre, it probably involves contributing code into the core, which | |
would require getting some core devs to buy into this. There's mild | |
interest if you want to do the work. | |
You can reuse this code under the terms of the MIT license. | |
https://opensource.org/licenses/MIT | |
A few informal requests -- there are no repercussions if you ignore them, | |
but I would be grateful if you honor them: | |
- If you want to make this real, and maintain a public project (e.g. on | |
Github) under the name `deepfrozen`, please e-mail me to discuss the | |
design and project setup. You'll make the rules -- of course -- but I | |
may dredge a few useful ideas from my memory. | |
- Please include in your code a link back to this gist if you use a | |
substantial part of it. | |
- I'm glad to hear from you if you find it interesting or useful. | |
In the unlikely even that I'll find time to revive this project, I will | |
comment on this gist. Likewise, if you want to be notified of any "real" | |
project using this, or if you start one, feel free to comment here as well. | |
== Related work == | |
https://github.com/tobgu/pyrsistent | |
== Demo session == | |
In[1]: | |
from collections import OrderedDict | |
import deepfrozen | |
deepfreeze = deepfrozen.deepfreeze | |
x = deepfreeze({deepfreeze({3: 5}): {deepfreeze(OrderedDict(a=6)), 5, ('a', 'b')}}) | |
x | |
Out[1]: deepfreeze({deepfreeze({3: 5}): {deepfreeze(OrderedDict([('a', 6)])), deepfreeze(('a', 'b')), 5}}) | |
In [4]: iter(x) | |
Out[4]: <dict_keyiterator at 0x7fc9f1108908> | |
In [5]: a, b, c = x[deepfreeze({3: 5})] | |
In [6]: a, b, c | |
Out[6]: (deepfreeze(OrderedDict([('a', 6)])), deepfreeze(('a', 'b')), 5) | |
In [7]: a['a'] | |
Out[7]: 6 | |
In [10]: class my_fields_are_frozen(metaclass=deepfrozen.frozentype): | |
...: a: 'Mapping' = {} | |
...: b: 'Sequence' | |
...: d = my_fields_are_frozen(a=x, b=[5, 6, 7]) | |
In [11]: d.b | |
Out[11]: [5, 6, 7] | |
In [12]: d.b = [] | |
--------------------------------------------------------------------------- | |
AttributeError Traceback (most recent call last) | |
<ipython-input-12-afc26968365e> in <module>() | |
----> 1 d.b = [] | |
AttributeError: can't set attribute | |
In [13]: d.b.append(6) | |
In [14]: dd = deepfreeze(d) | |
In [15]: dd | |
Out[15]: deepfreeze(my_fields_are_frozen)(a={deepfreeze({3: 5}): {deepfreeze(('a', 'b')), deepfreeze(OrderedDict([('a', 6)])), 5}}, b=(5, 6, 7, 6)) | |
In [16]: d | |
Out[16]: my_fields_are_frozen(a=deepfreeze({deepfreeze({3: 5}): {deepfreeze(OrderedDict([('a', 6)])), deepfreeze(('a', 'b')), 5}}), b=[5, 6, 7, 6]) | |
In [17]: dd.b = [] | |
--------------------------------------------------------------------------- | |
AttributeError Traceback (most recent call last) | |
<ipython-input-17-b7e66d849043> in <module>() | |
----> 1 dd.b = [] | |
AttributeError: can't set attribute | |
In [18]: dd.b.append(7) | |
--------------------------------------------------------------------------- | |
AttributeError Traceback (most recent call last) | |
<ipython-input-18-2f58384a92e5> in <module>() | |
----> 1 dd.b.append(7) | |
AttributeError: 'deepfrozentuple' object has no attribute 'append' | |
''' | |
### ==> ./.flake8 <== ### | |
''' | |
[flake8] | |
max-line-length = 80 | |
# E123 / E126 / E127 / E131 force illegible formatting on multi-line | |
# comprehensions. | |
# E266 (no doubled # in comments) removes a useful emphasis mechanism. | |
# W503 is silly because leading with operators is more legible. | |
ignore = E123,E126,E127,E131,E266,W503 | |
select = B,C,E,F,P,T4,W,B9 | |
''' | |
### ==> ./BUCK <== ### | |
python_library( | |
name = "deepfrozen", | |
srcs = [ | |
"__init__.py", | |
"_deepfrozen.py", | |
"_ensure_new_called.py", | |
"_frozendict.py", | |
"_frozentype_util.py", | |
"_util.py", | |
], | |
) | |
python_unittest( | |
name = "test-deepfrozen", | |
srcs = [ | |
"tests/test_frozentype.py", | |
"tests/test_ensure_new_called.py", | |
], | |
# 100% coverage is not enough, we should actually exercise the | |
# semantics. | |
needed_coverage = [( | |
100.0, | |
":deepfrozen", | |
)], | |
deps = [":deepfrozen"], | |
) | |
### ==> ./__init__.py <== ### | |
#!/usr/bin/env python3 | |
from ._deepfrozen import ( # noqa: F401 | |
# Use on POD structures & frozentypes | |
deepfreeze, | |
# For `issubclass(type(cls), deepfrozen)`, not usable otherwise | |
deepfrozen, | |
# Metaclass, makes immutable records with a "Data class"-style syntax | |
frozentype, | |
# Just like `frozentype`, but `deepfreeze`s its fields | |
deepfrozentype, | |
# Shallow-frozen mapping analog of `frozenset` | |
frozendict, frozenordereddict, | |
# Containers that `deepfreeze` their items | |
deepfrozentuple, deepfrozenset, deepfrozendict, deepfrozenordereddict, | |
) | |
### ==> ./_deepfrozen.py <== ### | |
#!/usr/bin/env python3 | |
''' | |
Provides the core `frozentype` and `deepfrozen` features. | |
This file is pretty large, but everything in its scope relies on free access | |
to `deepfreeze`, `deepfrozen`, `frozentype`, or some of their descendants. | |
The easiest way to break it up further would be to create a few more focused | |
helpers, and stash them in `_frozentype_util` or `_util` or similar. Check | |
out `_merge_fields_across_bases` for an example of dependency breaking. | |
''' | |
import itertools | |
from abc import ABCMeta | |
from collections import OrderedDict | |
from types import MappingProxyType | |
from ._ensure_new_called import ( | |
_ensure_superclass_new_is_called, _SuperclassNotCalled, | |
) | |
from ._frozendict import frozendict, frozenordereddict | |
from ._frozentype_util import ( | |
RequiredField, _normalize_and_validate_frozentype_fields, | |
_merge_fields_across_bases, FrozentypeDict, _unpickle, _unpickle_deep | |
) | |
from ._util import ( | |
_check_new_called, _check_reserved_attributes, | |
_have_one_argument_raise_if_more, | |
) | |
def _check_new_deepfrozen_metaclasses( | |
supermetacls, metacls, actual, expected=_SuperclassNotCalled() | |
): | |
'Ensure that only a whitelist of deepfrozen subclasses can create classes' | |
_check_new_called(supermetacls, metacls, actual, expected) | |
# Only a whitelist of `deepfrozen` sub-metaclasses may instantiate | |
# classes. Allowing user variants of `deepfrozen` would likely lead to | |
# gross violations of immutability. The current whitelist-only approach | |
# works because all of the "blessed" metaclasses ensure an immutable | |
# storage base, guarantee `__slots__ = ()` throughout the MRO, and take | |
# care to deep-freeze their sub-items. | |
# | |
# This function already checks `__slots__` and the base storage class. | |
# | |
# Thus, to allow user variants of `deepfrozen`, we would want to wrap | |
# `cls.__new__` for all created classes, and verify that its contents | |
# are deep-frozen. This has a clear runtime cost, and would be | |
# redundant for the built-in classes. Search for "Future: Runtime wrap" | |
# for other instances where this sort of runtime validation is relevant. | |
# | |
# NB: `_deepfrozen_storage.__new__` also relies on this whitelist, since | |
# it needs to be sure that these `_deepfrozen_*_storage` are its only | |
# subclasses. | |
assert supermetacls is deepfrozen | |
if metacls not in ( | |
deepfrozentype, _deepfrozen_tuple_storage, _deepfrozen_set_storage, | |
_deepfrozen_dict_storage, | |
): | |
raise TypeError(f'{metacls} is not a valid subclass of deepfrozen') | |
# The next two checks are mostly redundant with `frozentype` and | |
# `_deepfrozen_storage`, except the latter does not check `__slots__`. | |
# This check does, in fact, handle deep-frozen dicts, since they are | |
# modeled as a 1-element tuple containing a mappingproxy. | |
if actual.__mro__[-2:] not in [(tuple, object), (frozenset, object)]: | |
raise TypeError( | |
f'{metacls.__name__} made a class {actual} with a mutable storage ' | |
f'base. Here is its MRO: {actual.__mro__}' | |
) | |
# Check that all custom classes in the MRO have empty __slots__. | |
for c in actual.__mro__[:-2]: | |
if c.__dict__.get('__slots__') != (): | |
raise TypeError( | |
f'{metacls.__name__} made the class {actual}, but {c} in its ' | |
f'MRO did not set `__slots__ = ()`' | |
) | |
# Paranoia: in case somebody changes a class's `__slots__` after | |
# declaration, this will still catch the error. This will **not** catch | |
# the case when somebody creates a class with a non-empty `__slots__`, | |
# and then overwrites it with an empty `__slots__`, thus retaining a | |
# hidden mutable attribute. Python forbids non-empty slots for `tuple`, | |
# so the error can only affect a `frozenset` base -- but the workaround | |
# of wrapping it in a tuple like a frozendict is not worth it. | |
fake_instance = actual.__mro__[-2].__new__(actual) | |
bad_dict = getattr(fake_instance, '__dict__', None) | |
if ( | |
bad_dict is not None and | |
# Work around the fact that frozentype provides a __dict__ property. | |
# Future: make this more robust if this ends up breaking user code. | |
# Or, if this is too loathsome, then move the `__slots__` check into | |
# `_deepfrozen_storage`, and update the docs above. | |
bad_dict is not | |
actual.__mro__[-3].__dict__.get('__dict__').fget(fake_instance) | |
): | |
raise TypeError( | |
f'{metacls.__name__} made the class {actual}, and it has a ' | |
'__dict__ attribute due to an incorrect __slots__ setup.' | |
) | |
class deepfrozen(type): | |
'This tag metaclass guarantees that its classes are recursively immutable' | |
def __init_subclass__(submetacls, **kwargs): # noqa: E902 | |
super().__init_subclass__(**kwargs) | |
# This wraps the `__new__` of every submetaclass, and runs crucial | |
# checks after the outermost `__new__` returns. Wrapping burns some | |
# CPU at class creation time, but it brings safety -- it is easy to | |
# misuse or forget `super().__new__`. This is not a defense against | |
# deliberate abuse (impossible in Python), but a sanity check. | |
# | |
# A nice side effect of wrapping every `__new__` is that we are able | |
# to verify the final content of the class, after post-processing by | |
# our submetaclasses. | |
_ensure_superclass_new_is_called( | |
__class__, submetacls, _check_new_deepfrozen_metaclasses | |
) | |
def __new__(metacls, class_name, bases, attr_dict): | |
_check_reserved_attributes('deepfrozen', class_name, attr_dict) | |
# This implicitly (and intentionally) prevents `deepfrozen` itself | |
# from instantiating classes -- it is not wrapped by | |
# `__init_subclass__`, so this will fail with AttributeError: | |
return metacls._deepfrozen_internal_new_result[__class__].set_result( | |
super().__new__(metacls, class_name, bases, attr_dict) | |
) | |
def deepfreeze(x): | |
''' | |
Recursively convert a datastructure into its deepfrozen analog. | |
NB This also supports promoting `frozentype` classes (not just | |
instances) to their `deepfrozentype` analogs, but (for now), I | |
have omitted support for other freezable classes. | |
''' | |
# No change: instances of immutable scalars -- this does NOT permit | |
# subclasses of built-ins, since those may have mutable attributes. | |
# Future: if it is demonstrably useful to allow such subclasses, we can | |
# follow the pattern set by containers, and copy v to the base class. | |
# We prohibit it for now, since the copy incurs extra complexity & cost. | |
if x is None or type(x) in (str, int, float, bytes): | |
return x | |
# No change: a class that is already deep-frozen. | |
elif issubclass(type(x), deepfrozen): | |
return x | |
# Grab the deep-frozen class corresponding to a `frozentype` class. | |
elif issubclass(type(x), frozentype): | |
deep_x = getattr(x, '_deepfrozen_internal_deep_subclass', None) | |
if deep_x is None: | |
raise TypeError( | |
f'Cannot deep-freeze {type(x)} because we did not know how ' | |
'to automatically define its deep-frozen analog.' | |
) | |
return deep_x | |
# No change: an instance of a deepfrozen type. | |
elif issubclass(type(type(x)), deepfrozen): | |
return x | |
# Deep-frozen copy: a `frozentype` instance. Must precede `tuple` check. | |
elif issubclass(type(type(x)), frozentype): | |
return tuple.__new__( | |
deepfreeze(type(x)), | |
# We cannot freeze the class used for the incomparability hack, | |
# but it's not really part of our mutable state anyway. NOTE: | |
# the class remains `type(x)`, NOT `deepfreeze(type(x))`, which | |
# means that frozentype & deepfrozentype are comparable. | |
((v if i == 0 else deepfreeze(v)) for i, v in enumerate(x)), | |
) | |
# Deep-frozen copy: built-in containers -- note that if `x` is actually | |
# a custom subclass of a container, this will discard its attributes. | |
elif isinstance(x, (tuple, list)): | |
return deepfrozentuple((deepfreeze(i) for i in x)) | |
elif isinstance(x, (set, frozenset)): | |
return deepfrozenset((deepfreeze(i) for i in x)) | |
elif isinstance(x, dict): | |
return ( | |
deepfrozenordereddict if isinstance(x, OrderedDict) | |
else deepfrozendict | |
)( | |
(deepfreeze(k), deepfreeze(v)) for k, v in x.items() | |
) | |
elif isinstance(x, type): | |
raise TypeError(f'Cannot freeze {x} types') | |
else: | |
raise TypeError(f'Cannot freeze {type(x)} instances') | |
def _check_new_frozentype_metaclasses( | |
supermetacls, metacls, actual, expected=_SuperclassNotCalled() | |
): | |
'Ensure that only a whitelist of frozentype subclasses can create classes' | |
_check_new_called(supermetacls, metacls, actual, expected) | |
# This whitelist exists because the code currently does not enforce the | |
# constraints for correct custom sub-metaclasses of `frozentype`. These | |
# would include, at least: | |
# - Checking that their class instances have empty `__slots__` | |
# throughout the MRO, and ultimately derive from `tuple`. | |
# - Validating that the tuple instance returned by `cls.__new__` is the | |
# one that was created by `_frozentype_base.__new__`. | |
assert supermetacls is frozentype | |
if metacls is not deepfrozentype: # `frozentype.__new__` is not wrapped | |
raise TypeError(f'{metacls} is not a valid subclass of frozentype') | |
class frozentype(type): | |
'A metaclass for defining frozentype classes.' | |
def __init_subclass__(submetacls, **kwargs): # noqa: E902 | |
super().__init_subclass__(**kwargs) | |
# `deepfrozen.__init_subclass__` has a quick explanation. | |
_ensure_superclass_new_is_called( | |
__class__, submetacls, _check_new_frozentype_metaclasses, | |
) | |
def __new__( | |
metacls, class_name, bases, attr_dict, *, | |
_frozen_internal_stop_recursion=False, | |
): | |
if super().__new__ is not type.__new__: | |
# Without this assertion, inserting code between | |
# `frozentype.__new__` would break `frozentype.mro()`, which | |
# relies on `frozentype_base` being last in | |
# `new_frozentype.__bases__`. Such inheritance would also very | |
# likely break pickling or deep-freezing. It would also | |
# break the MRO simulation of `_merge_fields_across_bases`. | |
raise TypeError( | |
'When your metaclass inherits from `frozentype`, it should ' | |
'make sure to leave `frozentype` at the next-to-last place ' | |
f'in the MRO: {metacls.__mro__}' | |
) | |
# _merge_fields_across_bases calls us recursively | |
if _frozen_internal_stop_recursion: | |
return super().__new__(metacls, class_name, bases, attr_dict) | |
# May be redundant with the `deepfrozen` check. Do it anyway. | |
_check_reserved_attributes('frozentype', class_name, attr_dict) | |
field_to_default = MappingProxyType(_merge_fields_across_bases( | |
__class__, class_name, bases, attr_dict, | |
)) | |
class frozentype_base(tuple): | |
__slots__ = () # Forbid adding new attributes | |
# Forbid positional arguments | |
def __new__(_cls, **field_to_value): # noqa: B902 | |
# The metaclass is supposed to edit the MRO so that this | |
# tuple initialization sequence ends up dead last, see the | |
# comment on `super().__new__()` below. | |
if super().__new__ is not tuple.__new__: | |
raise AssertionError( | |
f'`_frozentype_base_{class_name}` must be followed by ' | |
f'`tuple` in the MRO, got {_cls.__mro__}' | |
) | |
# MUTATES field_to_value, OK since ** gave us our own dict. | |
_normalize_and_validate_frozentype_fields( | |
_cls, field_to_value, field_to_default | |
) | |
# To benchmark: it might be faster not to construct the | |
# intermediate tuple, and to pass a generator instead. | |
return tuple.__new__(_cls, ( | |
# A hack to make different frozentypes with the same | |
# data content compare (eq, ne, lt, ge, gt, ge) & hash | |
# differently. Future: consider changing the comparator | |
# semantics to match @dataclass behavior? | |
getattr( | |
_cls, | |
# Allow comparisons with the non-frozen variant. | |
'_deepfrozen_internal_shallow_superclass', | |
_cls | |
), | |
*( | |
deepfreeze(field_to_value[k]) | |
if issubclass(metacls, deepfrozen) | |
else field_to_value[k] | |
for k in field_to_default | |
) | |
)) | |
def __repr__(self, *, omit_deepfreeze=False): | |
if issubclass(type(self.__class__), deepfrozen): | |
shallow_superclass = getattr( | |
self.__class__, | |
'_deepfrozen_internal_shallow_superclass', | |
None, | |
) | |
if shallow_superclass is None: | |
typename = type(self).__name__ | |
else: | |
typename = shallow_superclass.__name__ | |
if not omit_deepfreeze: | |
typename = f'deepfreeze({typename})' | |
return f'{typename}(' + ', '.join( | |
f'{k}={deepfrozen_repr(getattr(self, k))}' | |
for k in field_to_default | |
) + ')' | |
return f'{type(self).__name__}(' + ', '.join( | |
f'{k}={repr(getattr(self, k))}' for k in field_to_default | |
) + ')' | |
@property | |
def __dict__(self, _memoized=[]): # noqa: B006 | |
''' | |
This is much slower than regular `.__dict__`, use sparingly. | |
NB: `mappingproxy.__repr__` seems to sort the fields, but | |
`iter(o.__dict__)` **ought** to return them in the C3 order. | |
''' | |
if not _memoized: | |
# Besides speeding up repeated calls, this lets us use | |
# `is` to work around this fake `__dict__` in | |
# `_check_new_deepfrozen_metaclasses`. | |
_memoized.append(MappingProxyType( | |
FrozentypeDict(self, field_to_default) | |
)) | |
return _memoized[0] | |
def __dir__(self): | |
'See help(frozentype.__dir__).' | |
return dir(self.__class__) | |
def __reduce__(self): | |
'Support copy & pickle operations' | |
it = tuple.__iter__(self) | |
tag = next(it) | |
# The comparison tag might be our shallow cousin, see __new__ | |
return (_unpickle, (type(self), *it)) if tag is type(self) \ | |
else (_unpickle_deep, (tag, *it)) | |
frozentype_base.__name__ = f'_frozentype_base_{class_name}' | |
frozentype_base.__qualname__ = frozentype_base.__name__ | |
# This flag is required by our `mro()` override. | |
frozentype_base._deepfrozen_internal_is_base = True | |
# This is a function to give each captured `idx` its own scope. | |
def field_accessor(idx): | |
return property(lambda self: tuple.__getitem__(self, idx + 1)) | |
# Provide field accessors as class members of the new type. Move | |
# any declared field defaults into `frozentype_base`. Mutates | |
# attr_dict, but we own `attr_dict` per the metaclass contract. | |
for idx, field in enumerate(field_to_default): | |
default = attr_dict.get(field, RequiredField) | |
if default is not RequiredField: | |
setattr(frozentype_base, field, default) | |
attr_dict[field] = field_accessor(idx) | |
# Since we're inheriting from `tuple`, __slots__ must be empty if set. | |
# | |
# frozentypes are supposed to be immutable, so we should not allow | |
# people to add attributes after construction. Without __slots__, | |
# instances of this class would end up with a `__dict__` attribute, | |
# which would allow setting arbitrary data on a created object -- | |
# and worse yet, the data would be excluded from ==, `hash` | |
# computations, serialization, etc. In other words, such data would | |
# be "semantically invisible". | |
if '__slots__' in attr_dict: | |
raise TypeError( | |
'Do not set __slots__ on frozentypes. ' | |
f'Got {attr_dict["__slots__"]} for class {class_name}.' | |
) | |
attr_dict['__slots__'] = () | |
new_frozentype = super().__new__( | |
metacls, | |
class_name, | |
# We want to put our `__new__` last in the MRO, which allows | |
# user `__new__` overloads to customize its keyword arguments, | |
# while letting us ensure that the object we construct always | |
# follows the frozentype contract. | |
# | |
# The tuple here is only half of the solution, because if the | |
# current class subclasses another `frozentype`, both | |
# parent & child would expect their `__new__` to run last. We | |
# only want to run the child's `__new__`, since its | |
# functionality is a strict superset of the parent. To achieve | |
# this, we edit the MRO, see our `mro()` overload. | |
(*bases, frozentype_base), | |
attr_dict, | |
) | |
# Automatically create a deepfrozentype for any vanilla frozentype. | |
if not issubclass(metacls, deepfrozen): | |
# Sanity check: if this is a custom subclass of the `frozentype` | |
# metaclass, we can no longer automatically create its | |
# deepfrozen variant. | |
if issubclass( | |
metacls._deepfrozen_internal_deepfrozen_metaclass, metacls | |
): | |
new_deepfrozentype = \ | |
metacls._deepfrozen_internal_deepfrozen_metaclass( | |
'_deepfrozen__' + new_frozentype.__name__, | |
(new_frozentype,), | |
{}, | |
) | |
# This is used for ensuring that `deepfrozentype` classes | |
# are transparently comparable with their `frozentype` | |
# counterparts. It also lets us `repr()` `deepfrozentype` | |
# classes that were automatically created from user-declared | |
# `frozentype`s as `deepfreeze(UserClass)` instead of | |
# displaying a machine-generated and unresolvable name. | |
type.__setattr__( | |
new_deepfrozentype, | |
'_deepfrozen_internal_shallow_superclass', | |
new_frozentype, | |
) | |
# `deepfreeze` needs to know how to map a `frozentype` class | |
# to its associated `deepfrozentype` class. | |
type.__setattr__( | |
new_frozentype, | |
'_deepfrozen_internal_deep_subclass', | |
new_deepfrozentype, | |
) | |
# else: | |
# No deep-frozen type created, so `deepfreeze(new_frozentype)` | |
# will not work. | |
# Future: Runtime wrap of `new_frozentype.__new__` to enforce: | |
# - That its return value comes from `frozentype_base.__new__`. | |
# - That each of the tuple's items is also deep-frozen if | |
# `issubclass(type(cls), deepfrozen)`. | |
# This handles the needs of shallow `frozentype`s, and of both | |
# explicitly and automatically created `deepfrozentype`s. | |
if metacls is frozentype: # `frozentype.__new__` is not wrapped | |
return new_frozentype | |
return metacls._deepfrozen_internal_new_result[__class__].set_result( | |
new_frozentype | |
) | |
# Since we use `frozentype_base.__dict__` as the source of truth for | |
# new instance defaults, let's discourage people from changing it. | |
# | |
# Future: it may be OK to relax this restriction if a good reason comes | |
# up, because making the defaults mutable will not change existing | |
# object instances. Allowing it now seems like a bad idea. | |
def __delattr__(cls, attr): | |
raise TypeError('Cannot delete attributes of frozentypes') | |
def __setattr__(cls, attr, val): | |
raise TypeError('Cannot set attributes of frozentypes') | |
def __dir__(cls): | |
''' | |
Hide tuple-provided attributes and methods from frozentypes, on the | |
premise that people should not generally be using them as tuples. | |
Read the "Bugs" section of the frozentype docblock for more details. | |
''' | |
return sorted(set(itertools.chain.from_iterable( | |
c.__dict__.keys() | |
for c in cls.__mro__ | |
if c is not tuple | |
))) | |
def mro(cls): | |
''' | |
For any frozentype_base that did not originate from `cls`, but was | |
generated for a parent frozentype, remove that base from the MRO. | |
We need to construct a tuple just once, and `cls`, the leaf | |
subclass, had already incorporated the fields of superclasses via | |
`_merge_fields_across_bases` above. | |
''' | |
base_under_review = None | |
for c in super().mro(): | |
if base_under_review is not None: | |
# Discard all but the last `frozentype_base`, which by C3 | |
# should belong to `cls`. We have to check that this is the | |
# last base for `cls`, because otherwise | |
# `_merge_fields_across_bases` would get the wrong base in | |
# the MRO of its fake class (When in fact it should get no | |
# bases). | |
if c is tuple: | |
if cls.__bases__[-1] is base_under_review: | |
yield base_under_review | |
# We should only get here from `merge_fields_across_bases`, | |
# so fail if we are skipping a `frozentype_base`. | |
elif cls.__bases__[-1].__dict__.get( | |
'_deepfrozen_internal_is_base' | |
): | |
raise AssertionError( | |
f'Expected {cls.__bases__} not to end with a ' | |
f'`frozentype_base`. Something is broken.' | |
) | |
# `__dict__` instead of `hasattr` to avoid traversing the MRO. | |
if c.__dict__.get('_deepfrozen_internal_is_base'): | |
# Hold on to this `frozentype_base` until we see the next class | |
base_under_review = c | |
else: | |
yield c | |
class deepfrozentype(deepfrozen, frozentype): | |
pass | |
# There is nothing to do in `__new__`. | |
# | |
# Due to `_check_new_frozentype_metaclasses`, we know our MRO: | |
# - `deepfrozen.__new__`, just validates the created class, and | |
# - `frozentype.__new_`, creates the frozentype storage base | |
# and field accessors, etc. | |
# | |
# Without addressing the idea on "Future: Runtime wrap" in | |
# `frozentype.__new__`, the our best hope is that subclasses of | |
# `deepfrozentype` classes will ultimately call `frozentype.__new__`, | |
# and return the resulting object. That will freeze any mutable data | |
# injected into `cls.__new__`'s keyword arguments by the user-defined | |
# `__new__`s, and honor our contract. If the user-defined `__new__` | |
# fails to call & use `frozentype.__new__`, all bets are off. | |
# We will promote the `frozentype` metaclass to `deepfrozentype` to | |
# automatically create a deep-frozen variant with every `frozentype` that is | |
# defined. This is only marked internal because I do not know of a | |
# legitimate use-case for subclassing these metaclasses. | |
frozentype._deepfrozen_internal_deepfrozen_metaclass = deepfrozentype | |
def deepfrozen_repr(v, *, must_be_hashable=False): | |
''' | |
For readability, we want to avoid redundantly nesting `deepfreeze()` | |
calls inside the representation of an already-deepfrozen structure. | |
We also want to avoid putting unhashable literals in syntactic positions | |
that expect hashables (pass `must_be_hashable=True`), since the | |
resulting string would otherwise not be `eval`-able, e.g. | |
deepfreeze({{3: 5}: {5, 6, {3: ()}}}) # Bad: dicts in sets & dict keys | |
deepfreeze({deepfreeze({3: 5}): {5, 6, deepfreeze({3: ()})}}) # Good | |
''' | |
metacls = type(type(v)) | |
if issubclass(metacls, deepfrozen): | |
return v.__repr__(omit_deepfreeze=not must_be_hashable) | |
return repr(v) | |
class _deepfrozen_storage(deepfrozen): | |
''' | |
For each storage type (tuple, set, dict), we inherit from this to | |
produce a separate metaclass. The purpose of this scheme is to ensure | |
that user-exposed types with incompatible storage like `deepfrozenset` | |
and `deepfrozentuple` cannot occur in the same inheritance hierarchy | |
(due to a metaclass conflict). This seems cleaner than `deepfrozen` | |
walking the MRO manually to determine the storage compatibility of its | |
classes. | |
Note that `frozenset` and `tuple` are also layout-incompatible as base | |
classes. We have to do this extra shenanigan because `mappingproxy` is | |
not subclassable, and so our workaround for `frozendict` is to store a | |
tuple with a single `mappingproxy` inside. That would, unfortunately, | |
not have a layout conflict with `deepfrozentuple`, so the metaclass | |
conflict trick is necessary. | |
''' | |
def __new__(metacls, class_name, bases, attr_dict): # noqa: B902 | |
if super().__new__ is not deepfrozen.__new__: | |
raise TypeError(f'Unexpected MRO: {metacls.__mro__}') | |
# Goals: | |
# - Ensure exactly ONE `_deepfrozen_storage_base` is installed, | |
# last in the MRO. | |
# - Allow using plain tuple mixins with `deepfrozentuple`s, etc. | |
# In other words, not all bases must inherit from the deepfrozen | |
# storage, so long as their `__slots__` are recursively empty. | |
# Cases: | |
# - No bases, and no singleton => now making `deepfrozenX`, so | |
# create a `_deepfrozen_storage_base`. | |
# - No bases, have singleton => error, redefining `deepfrozenX`. | |
# - Have bases => ensure at least one base inherits from singleton | |
# (otherwise we might not even have a `_deepfrozen_storage_base`). | |
if bases: | |
# Since we don't have a metaclass conflict, it must be that all | |
# of the existing bases are compatible with the same | |
# `_deepfrozen_X_storage` metaclass (and not a subclass, see | |
# `_check_new_deepfrozen_metaclasses`). The assertion on | |
# `_deepfrozen_internal_class_singleton` guarantees that this | |
# metaclass is instantiated only once. It follows that every | |
# class in `bases` inherits from the same | |
# `_deepfrozen_storage_base_X`, or from one of its superclasses | |
# (e.g. `tuple` or `object`). | |
# | |
# So, we just need to check that at least one of the bases | |
# actually provides a `_deepfrozen_storage_base`. | |
singleton = metacls.__dict__.get( # Nothing subclasses NoneType | |
'_deepfrozen_internal_class_singleton', type(None), | |
) | |
if not any(issubclass(base, singleton) for base in bases): | |
raise TypeError( | |
f'None of the bases {bases} of {class_name} subclass the ' | |
f'{metacls} storage class of {singleton}' | |
) | |
# NB: Logically, we should check here that all of the bases have | |
# empty `__slots__`, and none bring a mutable `__dict__`, but | |
# for now, we let `_check_new_deepfrozen_metaclasses` do this. | |
# | |
# Future: Runtime wrap `new_cls.__new__` to ensure that its | |
# return value comes from `_deepfrozen_storage_base.__new__`, | |
# which should guarantee that (i) we have the right storage | |
# base, (ii) all the items in the container are deep-frozen. | |
# We would also need to do some extra checking for `frozendict`. | |
return super().__new__(metacls, class_name, bases, attr_dict) | |
# No bases: we are creating a singleton, `deepfrozentuple` or similar. | |
if '_deepfrozen_internal_class_singleton' in metacls.__dict__: | |
# The `if bases:` case relies on each storage metaclass being | |
# instantiated exactly once. | |
raise TypeError( | |
f'Metaclass {metacls} was already used for ' | |
f'{metacls._deepfrozen_internal_class_singleton}' | |
) | |
# else: Proceed to create the deepfrozen storage base & class. | |
storage_type = metacls._deepfrozen_internal_storage | |
if storage_type in (tuple, frozenset): | |
repr_delims = '()' if storage_type is tuple else '{}' | |
hashable_items = storage_type is frozenset | |
class storage_base(storage_type): | |
__slots__ = () | |
def __new__(cls, *args): | |
if super().__new__ is not storage_type.__new__: | |
raise TypeError(f'Unexpected MRO: {cls.__mro__}') | |
if _have_one_argument_raise_if_more(class_name, args): | |
return super().__new__(cls, ( | |
deepfreeze(i) for i in args[0] | |
)) | |
return super().__new__(cls) | |
def __repr__(self, *, omit_deepfreeze=False): | |
s = repr_delims[0] + ', '.join( | |
deepfrozen_repr(v, must_be_hashable=hashable_items) | |
for v in storage_type.__iter__(self) | |
) + repr_delims[1] | |
return s if omit_deepfreeze else f'deepfreeze({s})' | |
elif storage_type is frozendict: | |
class storage_base(storage_type): | |
__slots__ = () | |
def __new__(cls, *args, **kwargs): | |
if super().__new__ not in ( | |
frozendict.__new__, frozenordereddict.__new__ | |
): | |
raise TypeError(f'Unexpected MRO: {cls.__mro__}') | |
kwargs = {deepfreeze(k): deepfreeze(v) for k, v in kwargs} | |
if _have_one_argument_raise_if_more(class_name, args): | |
return super().__new__(cls, ( | |
(deepfreeze(k), deepfreeze(v)) | |
for k, v in args[0] | |
), **kwargs) | |
return super().__new__(cls, **kwargs) | |
def __repr__(self, *, omit_deepfreeze=False): | |
if isinstance(self, frozenordereddict): | |
s = 'OrderedDict([' + ', '.join( | |
( | |
f'({deepfrozen_repr(k, must_be_hashable=True)}' | |
f', {deepfrozen_repr(v)})' | |
) for k, v in storage_type.items(self) | |
) + '])' | |
else: | |
s = '{' + ', '.join( | |
( | |
deepfrozen_repr(k, must_be_hashable=True) + | |
': ' + deepfrozen_repr(v) | |
) for k, v in storage_type.items(self) | |
) + '}' | |
return s if omit_deepfreeze else f'deepfreeze({s})' | |
else: | |
raise TypeError(f'{class_name} has unknown storage {storage_type}') | |
storage_base.__name__ = f'_deepfrozen_storage_base__{class_name}' | |
storage_base.__qualname__ = storage_base.__name__ | |
new_deepfrozen = super().__new__( | |
metacls, | |
class_name, | |
(storage_base,), # We checked `bases` was empty. | |
attr_dict, | |
) | |
metacls._deepfrozen_internal_class_singleton = new_deepfrozen | |
# NB; We do not need to wrap `new_deepfrozen.__new__` since it's ours. | |
return new_deepfrozen | |
class _deepfrozen_tuple_storage(_deepfrozen_storage): | |
# Prefixed, as `deepfrozentuple` class attribute accesses can reach these: | |
_deepfrozen_internal_storage = tuple | |
class _deepfrozen_set_storage(_deepfrozen_storage): | |
_deepfrozen_internal_storage = frozenset | |
class _deepfrozen_dict_storage(_deepfrozen_storage, ABCMeta): | |
_deepfrozen_internal_storage = frozendict | |
class deepfrozentuple(metaclass=_deepfrozen_tuple_storage): | |
__slots__ = () | |
class deepfrozenset(metaclass=_deepfrozen_set_storage): | |
__slots__ = () | |
class deepfrozendict(metaclass=_deepfrozen_dict_storage): | |
__slots__ = () | |
# This is our most complicated container type, so here's an inheritance | |
# diagram to make its structure clearer. | |
# | |
# frozendict | |
# | | | |
# _deepfrozen_storage_base__deepfrozendict | | |
# | frozenordereddict | |
# deepfrozendict | | |
# | | | |
# deepfrozenordereddict | |
class deepfrozenordereddict(deepfrozendict, frozenordereddict): | |
__slots__ = () | |
### ==> ./_ensure_new_called.py <== ### | |
#!/usr/bin/env python3 | |
'Utilities that are not required to be in the main _deepfrozen scope.' | |
import functools | |
from collections import OrderedDict | |
class _SuperclassNotCalled: | |
''' | |
Instantiate this tag class as the default expected_result in your | |
validator. NB: expected_result=object() would do, but gives less | |
readable errors. | |
''' | |
pass | |
def _ensure_superclass_new_is_called(supercls, subcls, check_result): | |
''' | |
A common error is to forget to invoke the superclass's `__new__`. In | |
`deepfrozen`, that can lead to violations of the deep-frozen invariant, | |
so we err on the side of caution. To use this: | |
1) Define a module-level validator function (using local objects is | |
inefficient, see `_WrapperReturnValidator._result_checks`) as follows: | |
def validator(supercls, cls, actual, expected=_SuperclassNotCalled()): | |
""" | |
- `supercls` is the class, whose `__new__` must get called. | |
- `actual` is what `cls.__new__` just returned. | |
- `expected` is what `supercls.__new__` had returned | |
during the current invocation of `cls.__new__`, or | |
a `_SuperclassNotCalled` instance if it was not called at all. | |
""" | |
2) Wrap validator around the class's `__new__`. Good places include: | |
- After a metaclass `__new__` created your class object: | |
_ensure_superclass_new_is_called(SuperToCheck, new_cls, validator) | |
- In `__init_subclass__(cls)` for your base class: | |
_ensure_superclass_new_is_called(__class__, cls, validator) | |
3) To register that `SuperToCheck.__new__` is called, make it return its | |
result thus: | |
return cls._deepfrozen_internal_new_result[__class__].set_result( | |
super().__new__(...) | |
) | |
''' | |
assert issubclass(subcls, supercls) | |
# When C inherits from B inherits from A, the validator to ensure that | |
# A.__new__ is called must wrap both B.__new__ and C.__new__. Those two | |
# wrappers should share the same result storage, since A will only call | |
# `set_result` once. On the other hand, if B installs another validator | |
# to ensure that `B.__new__` is called, that should use a separate | |
# result storage -- otherwise, we would not know if only one of | |
# A.__new__ and B.__new__ was called. This storage setup addresses both | |
# needs: | |
# | |
# {class_whose_new_must_becalled: _WrapperReturnValidator(), ...} | |
# | |
# Note that the validator objects live for the lifetime of the program, | |
# but the results are emptied as soon as validation is complete, so this | |
# should not leak memory. | |
if not hasattr(subcls, '_deepfrozen_internal_new_result'): | |
subcls._deepfrozen_internal_new_result = {} | |
validator = subcls._deepfrozen_internal_new_result.setdefault( | |
supercls, _WrapperReturnValidator(), | |
) | |
validator._result_checks[check_result] = None | |
orig_fn = subcls.__new__ | |
@functools.wraps(orig_fn) | |
def wrapped_fn(cls, *args, **kwargs): | |
assert issubclass(cls, subcls) | |
assert subcls._deepfrozen_internal_new_result[supercls] is validator | |
validator._subscribe_to_result() | |
res = orig_fn(cls, *args, **kwargs) | |
assert subcls._deepfrozen_internal_new_result[supercls] is validator | |
try: | |
validator._validate_result_if_last(supercls, cls, res) | |
except Exception: | |
# If a validator raises, and there are still superclasses that | |
# went unchecked, their results would be leaked, so clean up. | |
# See test_no_leak_when_validator_throws for an example. | |
for val in subcls._deepfrozen_internal_new_result.values(): | |
val._clear_on_exception() | |
raise | |
return res | |
subcls.__new__ = wrapped_fn | |
class _WrapperReturnValidator: | |
'See _ensure_superclass_new_is_called for usage.' | |
def __init__(self): | |
# If C subclasses B subclasses A, and A's validator wraps both C & | |
# B's __new__, we only want to run the outermost of these, i.e. B's | |
# validator when constructing a B instance, and C's validator for | |
# C's instances. The subscriber counting accomplishes that. | |
self._num_subscribed = 0 | |
# Whether the validator is added by `__init_subclass__` or by a | |
# metaclass, it will always be added for all subclasses, and not | |
# just for the subclass of interest. Storing validators as dict | |
# keys will eliminate the resulting copies -- this will not work if | |
# the validator is created as a local object every time we wrap. | |
# The dict is ordered for ease of testing. | |
self._result_checks = OrderedDict() | |
def set_result(self, result): | |
# If this validator is triggered only by a wrap for a subclass's | |
# __new__, but we are constructing a superclass, setting the result | |
# with no subscriptions would leak the result -- there would never | |
# be a `_validate_result_if_last` call to clear it. | |
if self._num_subscribed > 0: | |
# `None` is a kind-of-valid result, so we use "attribute | |
# deleted" to represent "no result" was set. | |
assert not hasattr(self, '_result') | |
self._result = result | |
return result | |
def _subscribe_to_result(self): | |
self._num_subscribed += 1 | |
assert not hasattr(self, '_result') | |
def _validate_result_if_last(self, supercls, cls, actual_result): | |
assert self._num_subscribed > 0 | |
self._num_subscribed -= 1 | |
if self._num_subscribed: | |
return | |
if hasattr(self, '_result'): | |
expected_result = self._result | |
del self._result # delete first in case validation throws | |
for check in self._result_checks: | |
check(supercls, cls, actual_result, expected_result) | |
else: | |
# 3-argument call to let the validator use any default they want | |
for check in self._result_checks: | |
check(supercls, cls, actual_result) | |
def _clear_on_exception(self): | |
has_result = hasattr(self, '_result') | |
assert self._num_subscribed > 0 or not has_result | |
self._num_subscribed = 0 | |
if has_result: # If the result was populated, we must not leak. | |
del self._result | |
### ==> ./_frozendict.py <== ### | |
#!/usr/bin/env python3 | |
from collections import abc, OrderedDict | |
from types import MappingProxyType | |
class frozendict(abc.Mapping, tuple): | |
__slots__ = () | |
def __new__(cls, *args, **kwargs): | |
return tuple.__new__(cls, (MappingProxyType(dict(*args, **kwargs)),)) | |
def __contains__(self, key): | |
return key in tuple.__getitem__(self, 0) | |
def __getitem__(self, key): | |
return tuple.__getitem__(self, 0)[key] | |
def __len__(self): | |
return len(tuple.__getitem__(self, 0)) | |
def __iter__(self): | |
return iter(tuple.__getitem__(self, 0)) | |
def keys(self): | |
return tuple.__getitem__(self, 0).keys() | |
def values(self): | |
return tuple.__getitem__(self, 0).values() | |
def items(self): | |
return tuple.__getitem__(self, 0).items() | |
def get(self, key, default=None): | |
return tuple.__getitem__(self, 0).get(key, default) | |
def __eq__(self, other): | |
if isinstance(other, __class__): | |
other = tuple.__getitem__(other, 0) | |
return tuple.__getitem__(self, 0).__eq__(other) | |
def __ne__(self, other): | |
return not self == other | |
def __repr__(self): | |
return f'{type(self).__name__}({repr(tuple.__getitem__(self, 0))})' | |
def __hash__(self): | |
return hash(frozenset(self.items())) # Future: more efficient hash? | |
# Mirror the fact that OrderedDict subclasses dict. | |
class frozenordereddict(frozendict): | |
__slots__ = () | |
def __new__(cls, *args, **kwargs): | |
return tuple.__new__(cls, (MappingProxyType( | |
OrderedDict(*args, **kwargs) | |
),)) | |
def __repr__(self): | |
inner_repr = repr(tuple.__getitem__(self, 0)) | |
assert inner_repr[:12] == 'OrderedDict(' | |
return f'{type(self).__name__}({inner_repr[12:-1]})' | |
def __hash__(self): | |
return hash(tuple(self.items())) # Future: more efficient hash? | |
### ==> ./_frozentype_util.py <== ### | |
#!/usr/bin/env python3 | |
'`frozentype`-specific helpers that do not HAVE to be in _deepfrozen.py' | |
from collections import abc, OrderedDict | |
class RequiredField: | |
'frozentype marks required fields with this tag class as the default value' | |
pass | |
def _normalize_and_validate_frozentype_fields( | |
cls, field_to_value, field_to_default | |
): | |
''' | |
To construct a frozentype instance, the caller passes a number of | |
keyword arguments to populate the frozentype's fields, which may be | |
further modified by __new__ overloads of the class or its bases. | |
This helper takes the __new__-processed keyword arguments as the | |
dictionary `field_to_value`, and: | |
- validates that all the keys are fields of this frozentype, | |
- populates defaults for any keys that the user did not specify, | |
- errors when a field is required, but the user did not supply a key, | |
DANGER: This **MUTATES** field_to_value. | |
''' | |
# Make sure all arguments are known. | |
for field, _value in field_to_value.items(): | |
if field not in field_to_default: | |
raise TypeError(f'Constructing {cls} with unknown field {field}') | |
# Check we have required args, and back-fill optional ones. | |
for field, default in field_to_default.items(): | |
if field not in field_to_value: | |
if default is RequiredField: | |
raise TypeError(f'{cls} requires the field {field}') | |
field_to_value[field] = default | |
def _merge_fields_across_bases(frozentype, class_name, bases, attr_dict): | |
field_to_default = OrderedDict() | |
field_to_base = {} | |
def add(cls, field, default): | |
prev_name = field_to_base.get(field) | |
if prev_name is not None: | |
raise TypeError( | |
f'Both {cls.__name__} and {prev_name} specify field {field}' | |
) | |
field_to_base[field] = cls | |
field_to_default[field] = default | |
# Make a fake version of the class we are instantiating just so we can | |
# order the fields according to the MRO. | |
# | |
# The ordering matters because for programmer sanity we should combine | |
# fields into the underlying tuple in a deterministic way, and the | |
# reverse of the C3 MRO ordering is the most intuitive of those orders. | |
# It puts the base class fields first in the tuple, which makes them | |
# more significant for < and > comparisons. This is a nice mental | |
# model: an instance of a subclass is just an instance of the superclass | |
# with some extra fields appended. | |
# | |
# The reverse-C3 ordering gives us the option to add special syntax for | |
# a child class to "shadow" a parent's field class. However, we should | |
# probably never actually do this, because the resulting semantics seem | |
# worse. | |
# | |
# NB: This is `frozentype` instead of `metacls` because our arguments | |
# have already been pre-processed by our subclasses, so we could not | |
# possibly use a subclass here. We have to assume that subclasses do | |
# NOT modify the MRO in any way that would affect this field resolution. | |
# Future: we could explicitly verify this, but it seems non-critical | |
# since such crazy subclasses would just end up with a set of fields | |
# that is different from they would expect. | |
fake_cls = frozentype( | |
# Without _frozen_internal_stop_recursion, this would recurse forever. | |
# Also, setting up an frozentype_base for a fake is wasteful. | |
class_name, bases, attr_dict, _frozen_internal_stop_recursion=True, | |
) | |
for cls in reversed(fake_cls.__mro__): | |
if issubclass(type(cls), frozentype): | |
# The fields of an frozentype are exactly those variables that | |
# have an annotation, either in the class, or in one of its bases. | |
# | |
# Use `cls.__dict__`, since regular attribute resolution would | |
# also look in bases of `cls`. | |
for field in cls.__dict__.get('__annotations__', ()): | |
add(cls, field, getattr(( | |
cls if cls is fake_cls else cls.__bases__[-1] | |
), field, RequiredField)) | |
elif cls not in (tuple, object): | |
# Allowing non-`frozentype`s as bases would have a bunch of | |
# bad consequences: | |
# - The non-frozentype storage could inject mutability, and | |
# vetting for this would mean that we are effectively forcing | |
# it to be a `frozentype` or a `deepfrozen`. | |
# - The base's class variables would shadow fields. | |
# Mechanistically, this isn't different from a `frozentype` | |
# declaring a property, but in appearance, the | |
# non-`frozentype` class would appear to have completely | |
# different semantics than its sibling `frozentype`s. | |
# - The annotation semantics would feel broken. Option 1: the | |
# annotations on any such class would be treated as fields, | |
# which would violate the original design intent of the | |
# non-`frozentype` class. Option 2: the non-`frozentype` | |
# base would add no fields, and no storage, in which case | |
# it's no better than `frozentype` without fields. | |
# - `frozentype.__new__` would need to check all base for | |
# non-empty `__slots__`. | |
raise TypeError(f'Base {cls} of {class_name} must be a frozentype') | |
return field_to_default | |
class FrozentypeDict(abc.Mapping): | |
def __init__(self, obj, field_to_default): | |
self.obj = obj | |
# This is the actual object backing the frozentype class, but it's | |
# not risky to store like this, because the FrozentypeDict is hidden | |
# behind a MappingProxyType and never directly exposed to the user. | |
self.field_to_default = field_to_default | |
def __contains__(self, key): | |
return key in self.field_to_default | |
def __getitem__(self, key): | |
return getattr(self.obj, key) | |
def __len__(self): | |
return len(self.field_to_default) | |
def __iter__(self): | |
return iter(self.field_to_default) | |
# Important: shorter function names => smaller pickles. | |
# | |
# These helpers avoid storing the frozentype class twice, saving a few dozen | |
# bytes even for trivial frozentypes. Without the helper, we'd have the | |
# class once as the first argument to `tuple.__new__`, and a second time as | |
# the 0th entry of the tuple. Logically, this is a clone of what | |
# frozentype_base.__new__ does to create the instance. | |
# For shallow `frozentype`s, or explicit `deepfrozentype`s. | |
def _unpickle(*tup): | |
return tuple.__new__(tup[0], tup) | |
# For implicit `deepfrozentype`s obtained by `deepfreeze`ing shallow ones. | |
# Here, we set the comparability tag to the shallow type, but construct the | |
# deep one. See also `frozentype_base.__new__`. | |
def _unpickle_deep(*tup): | |
return tuple.__new__(tup[0]._deepfrozen_internal_deep_subclass, tup) | |
### ==> ./_util.py <== ### | |
#!/usr/bin/env python3 | |
'Helpers that do not HAVE to be in _deepfrozen.py' | |
from ._ensure_new_called import _SuperclassNotCalled | |
def _check_new_called(supercls, cls, actual, expected=_SuperclassNotCalled()): | |
# Detect bait-and-switch: subclass returns something other than what the | |
# superclass produced (if it was called at all). | |
if actual is not expected: | |
raise TypeError( | |
f'{cls}.__new__ returned {actual}, which was not {expected} ' | |
f'returned by superclass {supercls}.__new__' | |
) | |
assert type(actual) is cls # Also fail if we created the wrong type | |
def _check_reserved_attributes(metaclass_name, class_name, attr_dict): | |
# Reserve attribute names in the class dictionary for any features | |
# that require keyword args, or per-class state. | |
for attr_name in attr_dict: | |
if attr_name.startswith('_deepfrozen_internal'): | |
raise ValueError( | |
f'{metaclass_name} {class_name} must not define reserved ' | |
f'attribute {attr_name}.' | |
) | |
def _have_one_argument_raise_if_more(class_name, args): | |
'Raises on more than 1 argument like tuple/frozenset/dict do.' | |
if len(args) > 1: | |
raise TypeError( | |
f'{class_name}() takes at most 1 argument ({len(args)} given)' | |
) | |
return len(args) == 1 | |
### ==> ./tests/test_ensure_new_called.py <== ### | |
#!/usr/bin/env python3 | |
import unittest | |
from .._ensure_new_called import ( | |
_ensure_superclass_new_is_called, _SuperclassNotCalled, | |
) | |
class ValidationError(Exception): | |
pass | |
class EnsureSuperclassNewIsCalledTestCase(unittest.TestCase): | |
def setUp(self): | |
self.events = [] | |
self.maxDiff = 10e6 # The default is impractically small | |
def tearDown(self): | |
self.assertEqual([], self.events) | |
def _makeABC(self, break_B=False, break_C=False, break_validator=False): | |
validator1 = self._validator_raises if break_validator \ | |
else self._validator1 | |
class Meta(type): | |
def __new__(metacls, class_name, bases, attr_dict): | |
cls = super().__new__(metacls, class_name, bases, attr_dict) | |
self.events.append(('Meta wraps', cls)) | |
_ensure_superclass_new_is_called(B, cls, self._validator2) | |
return cls | |
class A: | |
def __init_subclass__(cls, **kwargs): | |
super().__init_subclass__(**kwargs) | |
self.events.append(('A wraps', cls)) | |
_ensure_superclass_new_is_called(__class__, cls, validator1) | |
def __new__(cls, *args, **kwargs): | |
self.events.append(('A new', cls, args, kwargs)) | |
self.assertIs(object.__new__, super().__new__) | |
# The next class is `object`, so drop args & kwargs | |
return cls._deepfrozen_internal_new_result[__class__] \ | |
.set_result(super().__new__(cls)) | |
class B(A): | |
def __init_subclass__(cls, **kwargs): | |
super().__init_subclass__(**kwargs) | |
self.events.append(('B wraps', cls)) | |
_ensure_superclass_new_is_called(__class__, cls, validator1) | |
def __new__(cls, *args, **kwargs): | |
self.events.append(('B new', cls, args, kwargs)) | |
if break_B: | |
return object.__new__(cls) | |
return cls._deepfrozen_internal_new_result[__class__] \ | |
.set_result(super().__new__(cls, *args, **kwargs)) | |
class C(B, metaclass=Meta): | |
def __new__(cls, *args, **kwargs): | |
self.events.append(('C new', cls, args, kwargs)) | |
if break_C: | |
return object.__new__(cls) | |
return super().__new__(cls, *args, **kwargs) | |
self._checkEvents([ | |
('A wraps', B), | |
('A wraps', C), | |
('B wraps', C), | |
('Meta wraps', C), | |
]) | |
return A, B, C | |
def _validator1( | |
self, supercls, cls, actual, expected=_SuperclassNotCalled(), *, | |
name='validator1', | |
): | |
args = (supercls, cls, actual, expected) | |
self.events.append((name, *args)) | |
if actual is not expected: | |
raise ValidationError(*args) | |
def _validator2( | |
self, supercls, cls, actual, expected=_SuperclassNotCalled(), | |
): | |
self._validator1(supercls, cls, actual, expected, name='validator2') | |
def _validator_raises( | |
self, supercls, cls, actual, expected=_SuperclassNotCalled(), | |
): | |
args = (supercls, cls, actual, expected) | |
self.events.append(('validator raises', *args)) | |
raise ValidationError(*args) | |
def _checkEvents(self, expected): | |
''' | |
Validate and discard the first len(expected) events -- tearDown() | |
ensures no events go unchecked. | |
''' | |
actual = self.events[:len(expected)] | |
# Discard before checking so tearDown doesn't complain redundantly | |
self.events[:len(expected)] = [] | |
self.assertEqual(expected, actual) | |
def _checkNoResultsLeaked(self, unused_classes, results_classes): | |
for cls in unused_classes: | |
self.assertFalse(hasattr(cls, '_deepfrozen_internal_new_result')) | |
for cls in results_classes: | |
for supercls, wrap in cls._deepfrozen_internal_new_result.items(): | |
if hasattr(wrap, '_result'): | |
raise AssertionError( | |
f'{cls} had {wrap._result} for superclass {supercls}' | |
) | |
def test_normal(self): | |
A, B, C = self._makeABC() | |
# It is important that this precedes the `c =` test, since that | |
# exercises the code path where we decline to set the `supercls` B | |
# result due to lack of subscribers. | |
b = B('pos', kw=5) | |
self._checkEvents([ | |
('B new', B, ('pos',), {'kw': 5}), | |
('A new', B, ('pos',), {'kw': 5}), | |
('validator1', A, B, b, b), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
c = C('arg', kwarg=7) | |
self._checkEvents([ | |
('C new', C, ('arg',), {'kwarg': 7}), | |
('B new', C, ('arg',), {'kwarg': 7}), | |
('A new', C, ('arg',), {'kwarg': 7}), | |
# The constructed object is returned in this order: | |
# - A.__new__, not wrapped, | |
# - B.__new__, wrapped by A.__init_subclass__ to verify A.__new__, | |
# => will not run _validator1(supercls=A) because the | |
# C.__new__ wrapper is the one that will hit 0 subscriptions. | |
# - C.__new__, wrapped by: | |
# * A.__init_subclass_ to verify A.__new__, first, since | |
# B.__init_subclass__ defers to super() before wrapping. | |
# => Runs _validator1(supercls=A) first, since it is the | |
# first (and innermost) wrapper. | |
# * B.__init_subclass__ to verify B.__new__ | |
# => Runs validator1(supercls=B) next. | |
# * Meta.__new__ to verify B.__new__ | |
# => Runs validator2(supercls=B) immediately after | |
# validator1(supercls=B), since validators for the same | |
# `supercls` are executed in the same loop in the order | |
# they were registered. | |
('validator1', A, C, c, c), | |
('validator1', B, C, c, c), | |
('validator2', B, C, c, c), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
def test_break_B(self): | |
A, B, C = self._makeABC(break_B=True) | |
with self.assertRaises(ValidationError) as ctx: | |
B('pos', kw=5) | |
self.assertEqual((A, B), ctx.exception.args[:2]) | |
almost_b, not_called_tag, = ctx.exception.args[2:] | |
self.assertIsInstance(almost_b, B) | |
self.assertIsInstance(not_called_tag, _SuperclassNotCalled) | |
self._checkEvents([ | |
('B new', B, ('pos',), {'kw': 5}), | |
('validator1', A, B, almost_b, not_called_tag), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
with self.assertRaises(ValidationError) as ctx: | |
C('arg', kwarg=7) | |
self.assertEqual((A, C), ctx.exception.args[:2]) | |
almost_c, not_called_tag, = ctx.exception.args[2:] | |
self.assertIsInstance(almost_c, C) | |
self.assertIsInstance(not_called_tag, _SuperclassNotCalled) | |
self._checkEvents([ | |
('C new', C, ('arg',), {'kwarg': 7}), | |
('B new', C, ('arg',), {'kwarg': 7}), | |
('validator1', A, C, almost_c, not_called_tag), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
def test_break_C(self): | |
A, B, C = self._makeABC(break_C=True) | |
b = B('pos', kw=5) | |
self._checkEvents([ | |
('B new', B, ('pos',), {'kw': 5}), | |
('A new', B, ('pos',), {'kw': 5}), | |
('validator1', A, B, b, b), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
with self.assertRaises(ValidationError) as ctx: | |
C('arg', kwarg=7) | |
self.assertEqual((A, C), ctx.exception.args[:2]) | |
almost_c, not_called_tag, = ctx.exception.args[2:] | |
self.assertIsInstance(almost_c, C) | |
self.assertIsInstance(not_called_tag, _SuperclassNotCalled) | |
self._checkEvents([ | |
('C new', C, ('arg',), {'kwarg': 7}), | |
('validator1', A, C, almost_c, not_called_tag), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
def test_no_leak_when_validator_throws(self): | |
A, B, C = self._makeABC(break_validator=True) | |
with self.assertRaises(ValidationError) as ctx: | |
C('arg', kwarg=7) | |
self.assertEqual((A, C), ctx.exception.args[:2]) | |
actual_c, expected_c = ctx.exception.args[2:] | |
self.assertIs(actual_c, expected_c) | |
self.assertIsInstance(actual_c, C) | |
self._checkEvents([ | |
('C new', C, ('arg',), {'kwarg': 7}), | |
('B new', C, ('arg',), {'kwarg': 7}), | |
('A new', C, ('arg',), {'kwarg': 7}), | |
('validator raises', A, C, actual_c, actual_c), | |
]) | |
self._checkNoResultsLeaked([A], [B, C]) | |
### ==> ./tests/test_frozentype.py <== ### | |
#!/usr/bin/env python3 | |
import unittest | |
from .. import frozentype | |
class PlantType(metaclass=frozentype): | |
# attributes / fields shared by all plants | |
has_roots: bool | |
color: str = 'green' | |
grows_in: str = 'soil' | |
class Algae(PlantType): | |
is_saltwater: bool | |
def __new__( | |
_cls, *, has_roots=False, grows_in='water', # noqa: B902 | |
**field_to_value | |
): | |
'While PlantType requires has_roots, Algae defaults it to False.' | |
return super().__new__( | |
_cls, has_roots=has_roots, grows_in=grows_in, **field_to_value | |
) | |
class Kelp(Algae): # Inheritance two levels deep. | |
length: int | |
def __new__( | |
_cls, *, has_roots=True, is_saltwater=True, # noqa: B902 | |
**field_to_value | |
): | |
'I have not heard of freshwater kelp, and it does have roots' | |
return super().__new__( | |
_cls, has_roots=has_roots, is_saltwater=is_saltwater, | |
**field_to_value, | |
) | |
class NoExtraFields(Kelp): # Inheriting without adding fields is allowed | |
def fun(self): | |
return 3 | |
KELP_WORKALIKES = (Kelp, NoExtraFields) | |
class FloweringPlant(metaclass=frozentype): | |
''' | |
This is a mix-in -- it does not inherit from PlantType, but classes like | |
Grain can inherit from it to get extra fields and functions. | |
''' | |
flower_color: str | |
num_petals: int = 3 | |
class Grain(PlantType, FloweringPlant): | |
grain_size_mm: float | |
is_edible: bool | |
class FrozentypeTestCase(unittest.TestCase): | |
def _check_values(self, ent, field_to_value): | |
self.assertEqual(set(ent.__dict__), set(field_to_value.keys())) | |
self.assertEqual(tuple.__getitem__(ent, 0), ent.__class__) | |
self.assertEqual( | |
{f: getattr(ent, f) for f in field_to_value.keys()}, | |
field_to_value, | |
) | |
def test_values(self): | |
# An explicit `has_roots` overrides our `__new__`. | |
self._check_values(Algae(has_roots=True, is_saltwater=True), { | |
'color': 'green', 'has_roots': True, 'grows_in': 'water', | |
'is_saltwater': True, | |
}) | |
# Use the `has_roots` default from `__new__`. | |
self._check_values(Algae(color='red', is_saltwater=True), { | |
'color': 'red', 'has_roots': False, 'grows_in': 'water', | |
'is_saltwater': True, | |
}) | |
for c in KELP_WORKALIKES: | |
# Kelp's `__new__` sets `has_roots` & `is_saltwater`, but leaves | |
# `grows_in` to be set by Algae's `__new__`. | |
self._check_values(c(length=7), { | |
'color': 'green', 'has_roots': True, 'grows_in': 'water', | |
'is_saltwater': True, 'length': 7, | |
}) | |
self._check_values( | |
# Override some of the __new__ and annotation-based defaults. | |
c(length=2, color='red', is_saltwater=False, grows_in='air'), | |
{ | |
'color': 'c', 'has_roots': True, 'grows_in': 'air', | |
'is_saltwater': False, 'length': 2, | |
}, | |
) | |
self._check_values( | |
Grain( | |
has_roots=True, | |
flower_color='yellow', | |
num_petals=0, | |
grain_size_mm=5, | |
is_edible=False, | |
), | |
{ | |
'color': 'green', | |
'has_roots': True, | |
'grows_in': 'soil', | |
'flower_color': 'yellow', | |
'num_petals': 0, | |
'grain_size_mm': 5, | |
'is_edible': False, | |
}, | |
) | |
def test_field_value_errors(self): | |
for c in KELP_WORKALIKES: | |
with self.assertRaisesRegex( | |
TypeError, '^.* requires the field length$' | |
): | |
c(has_roots=False) | |
with self.assertRaisesRegex( | |
TypeError, '^.* requires the field is_saltwater$' | |
): | |
Algae(has_roots=False) | |
with self.assertRaisesRegex(TypeError, '^.* unknown field foo$'): | |
Algae(has_roots=False, is_saltwater=False, foo='cat') | |
with self.assertRaisesRegex( | |
TypeError, '^.* requires the field flower_color$' | |
): | |
Grain(has_roots=True, grain_size_mm=5, is_edible=False), | |
def test_field_declaration_errors(self): | |
for redundant_field in ['color', 'flower_color']: | |
with self.assertRaisesRegex( | |
TypeError, f'^Both .* specify field {redundant_field}$' | |
): | |
type( | |
'BadGrain', | |
(PlantType, FloweringPlant), | |
{'__annotations__': {redundant_field: bool}}, | |
) | |
def test_new_overload(self): | |
class GrowsN(metaclass=frozentype): | |
n: int | |
def __new__(_cls, *, n, **field_to_value): # noqa: B902 | |
return super().__new__(_cls, n=n + 1, **field_to_value) | |
self.assertEqual(GrowsN(n=3).n, 4) | |
def test_new_overload_errors(self): | |
class DiscardsFields(metaclass=frozentype): | |
n: int | |
def __new__(_cls, **field_to_value): # noqa: B902 | |
return super().__new__(_cls) # Don't do this in your code! | |
with self.assertRaisesRegex(TypeError, '^.* requires the field n$'): | |
DiscardsFields(n=3) | |
def test_slots_errors(self): | |
class BadBase: | |
pass | |
with self.assertRaisesRegex( | |
TypeError, | |
r'^Base .*BadBase.* of APlant must be a frozentype$', | |
): | |
class APlant(BadBase, metaclass=frozentype): | |
pass | |
with self.assertRaisesRegex( | |
TypeError, '^Do not set __slots__ on frozentype.*$' | |
): | |
class AnotherPlant(metaclass=frozentype): | |
__slots__ = () | |
g = Grain( | |
has_roots=True, grain_size_mm=3, is_edible=True, | |
flower_color='red', num_petals=0, | |
) | |
with self.assertRaises(AttributeError): | |
g.boof = 3 | |
def test_repr(self): | |
self.assertEqual( | |
"PlantType(has_roots=True, color='green', grows_in='soil')", | |
repr(PlantType(has_roots=True)), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment