Created
October 31, 2011 10:12
-
-
Save progval/1327227 to your computer and use it in GitHub Desktop.
test references
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections, sys, types | |
weakref = None | |
generic_hash = id | |
# Possible additions: | |
# - Add callbacks to CallableReference | |
# - Reimplement RefValueList | |
def ref(obj, weak, _plain=0): | |
if not _plain and is_callable(obj): | |
return CallableReference(obj, weak) | |
if weak: | |
return WeakReference(obj) | |
else: | |
return StrongReference(obj) | |
def mapping(init={}): return RefKeyDictionary(init) | |
def is_callable(obj): | |
if isinstance(obj, collections.Callable): return 1 | |
try: | |
return isinstance(obj[1], collections.Callable) | |
except: | |
return 0 | |
def ref_is(ref1,ref2): | |
if ref1 is ref2: return 1 | |
o1 = ref1 and ref1() | |
o2 = ref2 and ref2() | |
if o1 is None or o2 is None: return 0 | |
return o1 is o2 | |
class Reference: | |
def callback(self, obj): | |
for cb in self.callbacks: | |
cb(obj, self) | |
self.callbacks = [] | |
def __init__(self, obj): | |
self.obj = self.ref(obj, self.callback) | |
self.callbacks = [] | |
self.hash = generic_hash(obj) | |
def __call__(self): | |
return self.deref(self.obj) | |
def __hash__(self): | |
return self.hash | |
__eq__ = ref_is | |
class WeakReference(Reference): | |
def ref(self, obj, cb): | |
global weakref | |
if not weakref: import weakref | |
return weakref.ref(obj, cb) | |
def deref(self, obj): | |
return obj() | |
class StrongReference(Reference): | |
def ref(self, obj, cb): | |
return obj | |
def deref(self, obj): | |
return obj | |
class CallableWrapper: | |
def __init__(self, obj, func): | |
self.obj = obj | |
self.func = func | |
def __call__(self, *args, **kwds): | |
if self.obj is None: | |
return self.func(*args, **kwds) | |
else: | |
return self.func(self.obj, *args, **kwds) | |
def is_callable_instance(obj): | |
return type(obj) is object and hasattr(obj, '__call__') | |
def is_method(obj): | |
return hasattr(obj, 'im_self') | |
def unwrap(func): | |
try: | |
obj, func = func | |
except: | |
obj = None | |
if is_callable_instance(func): | |
func = func.__call__ | |
if is_method(func): | |
if func.__self__ is not None: | |
if obj is None: obj = func.__self__ | |
else: assert obj is func.__self__ | |
func = func.__func__ | |
return obj, func | |
class CallableReference: | |
def __init__(self, func, weak): | |
obj, func = unwrap(func) | |
self.hash = hash((obj, func)) | |
if obj is not None: | |
obj = ref(obj, weak, _plain=1) | |
self.obj = obj | |
self.func = ref(func, weak, _plain=1) | |
def is_dead(self): | |
return self.obj is not None and self.obj() is None \ | |
or self.func() is None | |
def __call__(self): | |
if self.is_dead(): return None | |
obj = self.obj | |
if obj is not None: obj = obj() | |
func = self.func() | |
return CallableWrapper(obj, func) | |
def __eq__(self,other): | |
if self.__class__ != other.__class__: return 0 | |
return (ref_is(self.obj,other.obj) and | |
ref_is(self.func,other.func)) | |
def __hash__(self): | |
return self.hash | |
class RefKeyDictionary(collections.UserDict): | |
def __repr__(self): | |
return "<RefKeyDictionary at %s>" % id(self) | |
def callback(self, obj, key): | |
del self[key] | |
def add_callback(self, key): | |
key.callbacks.append(self.callback) | |
def __setitem__(self, key, value): | |
obj = key() | |
if obj is not None: | |
self.add_callback(key) | |
self.data[key] = value | |
def copy(self): | |
new = RefKeyDictionary() | |
for key, value in list(self.data.items()): | |
obj = key() | |
if obj is not None: | |
new[obj] = value | |
return new | |
def items(self): | |
L = [] | |
for key, value in list(self.data.items()): | |
obj = key() | |
if obj is not None: | |
L.append((key, value)) | |
return L | |
def iteritems(self): | |
return RefKeyedItemIterator(self) | |
def iterkeys(self): | |
return RefKeyedKeyIterator(self) | |
__iter__ = iterkeys | |
def itervalues(self): | |
return iter(self.data.values()) | |
def keys(self): | |
L = [] | |
for key in list(self.data.keys()): | |
obj = key() | |
if obj is not None: | |
L.append(key) | |
return L | |
def popitem(self): | |
while 1: | |
key, value = self.data.popitem() | |
obj = key() | |
if obj is not None: | |
return key, value | |
def update(self, dict): | |
for key, value in list(dict.items()): | |
self[key] = value | |
class BaseIter: | |
def __iter__(self): | |
return self | |
class RefKeyedKeyIterator(BaseIter): | |
def __init__(self, refdict): | |
self._next = iter(refdict.data.keys()).__next__ | |
def __next__(self): | |
while 1: | |
key = self._next() | |
obj = key() | |
if obj is not None: | |
return key | |
class RefKeyedItemIterator(BaseIter): | |
def __init__(self, refdict): | |
self._next = iter(refdict.data.items()).__next__ | |
def __next__(self): | |
while 1: | |
key, value = self._next() | |
obj = key() | |
if obj is not None: | |
return key, value | |
class RefValueList(collections.UserList): | |
pass # TBD |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Setup | |
>>> from .References import * | |
>>> class Object: | |
... def __repr__(self): | |
... return '<Object>' | |
>>> import sys | |
>>> def gc(): | |
... pass | |
>>> if sys.platform[:4] == 'java': | |
... import java.lang | |
... gc = java.lang.System.gc | |
ref(), WeakReference and StrongReference: | |
>>> o1, o2 = Object(), Object() | |
>>> s = ref(o1, weak=0) | |
>>> w = ref(o2, weak=1) | |
>>> s() is o1 | |
1 | |
>>> w() is o2 | |
1 | |
>>> del o1, o2 | |
>>> s() | |
<Object> | |
>>> w() | |
>>> o = Object() | |
>>> s = ref(o, weak=0) | |
>>> w = ref(o, weak=1) | |
>>> w == s | |
1 | |
>>> d = {} | |
>>> d[w] = 'test' | |
>>> d[s] | |
'test' | |
>>> del o, s; gc() | |
>>> w() | |
>>> d[w] | |
'test' | |
Reference callbacks: | |
>>> o = Object() | |
>>> def cb1(obj, ref): | |
... print('This is callback 1') | |
... | |
>>> def cb2(obj, ref): | |
... print('This is callback 2') | |
... | |
>>> r = ref(o, weak=1) | |
>>> r.callbacks.append(cb1) | |
>>> r.callbacks.append(cb2) | |
>>> del o; gc() | |
This is callback 1 | |
This is callback 2 | |
[Insert CallableReference callback tests] | |
CallableReference: | |
>>> def func1(x): | |
... print(x) | |
>>> class Test: | |
... def func2(self): | |
... print(self) | |
... def __repr__(self): | |
... return '<Test>' | |
... def __call__(self, x): | |
... print(x) | |
... | |
>>> test = Test() | |
>>> sc1 = ref(func1, weak=0) | |
>>> wc1 = ref(func1, weak=1) | |
>>> sc2 = ref(Test.func2, weak=0) | |
>>> wc2 = ref(Test.func2, weak=1) | |
>>> sc3 = ref(test.func2, weak=0) | |
>>> wc3 = ref(test.func2, weak=1) | |
>>> sc4 = ref(test, weak=0) | |
>>> wc4 = ref(test, weak=1) | |
>>> sc5 = ref((test, test.func2), weak=0) | |
>>> wc5 = ref((test, test.func2), weak=1) | |
>>> sc6 = ref((test, Test.func2), weak=0) | |
>>> wc6 = ref((test, Test.func2), weak=1) | |
>>> sc1==wc1, sc2==wc2, sc3==wc3, sc4==wc4, sc5==wc5, sc6==wc6 | |
(True, True, True, True, True, True) | |
>>> sc1()('Hello, world!') | |
Hello, world! | |
>>> wc1()('Hello, world!') | |
Hello, world! | |
>>> sc2()('Hello, world!') | |
Hello, world! | |
>>> wc2()('Hello, world!') | |
Hello, world! | |
>>> sc3()() | |
<Test> | |
>>> wc3()() | |
<Test> | |
>>> sc4()('Hello, world!') | |
Hello, world! | |
>>> wc4()('Hello, world!') | |
Hello, world! | |
>>> sc5()() | |
<Test> | |
>>> wc5()() | |
<Test> | |
>>> sc6()() | |
<Test> | |
>>> wc6()() | |
<Test> | |
>>> del func1, Test, test, sc1, sc2, sc3, sc4, sc5, sc6 | |
>>> wc2(), wc3(), wc4(), wc5(), wc6() | |
(None, None, None, None, None) | |
The global function will not die: | |
>>> from sys import getrefcount | |
>>> getrefcount(wc1()) > 0 | |
1 | |
>>> r1 = ref(lambda x: x*x, weak=0) | |
>>> r2 = ref(lambda x: x*x, weak=1) | |
>>> r1()(2) | |
4 | |
>>> r2() | |
>>> | |
RefKeyDictionary: | |
>>> d = RefKeyDictionary() | |
>>> o1 = Object() | |
>>> o2 = Object() | |
>>> sr = ref(o1, weak=0) | |
>>> wr = ref(o2, weak=1) | |
>>> d[sr] = 1 | |
>>> d[wr] = 1 | |
>>> len(d) | |
2 | |
>>> del o1 | |
>>> len(d) | |
2 | |
>>> del o2 | |
>>> len(d) | |
1 | |
[Insert RefValueList tests] | |
""" | |
if __name__ == "__main__": | |
print("If you want detailed output, use \"python test_references.py -v\".") | |
print("No output after this line indicates success.") | |
import doctest, test_references | |
doctest.testmod(test_references) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment