Last active
August 29, 2015 14:22
-
-
Save ekampf/13916578123fedb60977 to your computer and use it in GitHub Desktop.
Python NotificationsManager and observer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Singleton(type): | |
_instances = {} | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) | |
return cls._instances[cls] | |
class observes(object): | |
def __init__(self, name): | |
self.name = name | |
def __call__(self, f): | |
def wrapped_f(*args, **kwargs): | |
return f(*args, **kwargs) | |
NotificationsManager().register(self.name, wrapped_f) | |
return wrapped_f | |
class NotificationsManager(object): | |
__metaclass__ = Singleton | |
def __init__(self): | |
self.observers = dict() | |
def register(self, name, observer): | |
self.__verify_initialized(name) | |
if observer not in self.observers[name]: | |
self.observers[name].append(observer) | |
def unregister(self, name, observer): | |
self.__verify_initialized(name) | |
if observer in self.observers[name]: | |
self.observers[name].remove(observer) | |
def unregister_all(self): | |
self.observers.clear() | |
def notify(self, name, *args, **kwargs): | |
self.__verify_initialized(name) | |
for observer in self.observers[name]: | |
observer(*args, **kwargs) | |
def __verify_initialized(self, name): | |
if not self.observers.get(name): | |
self.observers[name] = [] | |
if __name__ == "__main__": | |
class Foo(object): | |
@staticmethod | |
@observes("my_event") | |
def onMyEvent(): | |
print("Foo.onMyEvent called!") | |
NotificationsManager().notify("my_event") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment