Last active
September 28, 2022 05:53
-
-
Save miholeus/8b2c2ad1e58692d02824940cfadcb0a6 to your computer and use it in GitHub Desktop.
event manager using python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import copy | |
from collections import defaultdict | |
try: | |
import Queue as Q # ver. < 3.0 | |
except ImportError: | |
import queue as Q | |
class Event(object): | |
def __init__(self, name = None, target=None, params=None): | |
if params is None: | |
params = [] | |
self._name = name | |
self._target = target | |
self._params = params | |
self._stop_propagation = False | |
@property | |
def name(self): | |
return self._name | |
@name.setter | |
def name(self, value): | |
self._name = value | |
@property | |
def target(self): | |
return self._target | |
@target.setter | |
def target(self, value): | |
self._target = value | |
@property | |
def params(self): | |
return self._params | |
@params.setter | |
def params(self, value): | |
assert isinstance(value, list) | |
self._params = value | |
def get_param(self, name, default=None): | |
return self._params.get(name, default) | |
def set_param(self, name, value): | |
self._params[name] = value | |
def stop_propagation(self, flag=True): | |
self._stop_propagation = flag | |
def propagation_is_stopped(self): | |
return self._stop_propagation | |
def copy(self): | |
event = Event(self.name, self.target, self.params) | |
return event | |
__copy__ = copy | |
class EventManager(object): | |
def __init__(self): | |
self.event_prototype = Event() | |
self.events = defaultdict(Q.PriorityQueue) | |
def set_event_prototype(self, event): | |
""" | |
Sets event prototype | |
Args: | |
event (Event): event instance | |
""" | |
self.event_prototype = event | |
def trigger(self, event_name, target=None, args=None): | |
event = copy.copy(self.event_prototype) | |
event.name = event_name | |
if target: | |
event.target = target | |
if args: | |
event.params = args | |
return self.trigger_listeners(event) | |
def attach(self, event_name, listener, priority=1): | |
""" | |
Attaches new listener to event | |
Args: | |
event_name (str): event's name | |
listener: callback listener for event | |
priority (int): priority for listener | |
""" | |
if not isinstance(event_name, str): | |
raise RuntimeError("{} expects a string for event; received {}".format( | |
self.attach.__name__, type(event_name) | |
)) | |
self.events[event_name].put((priority, listener)) | |
def detach(self, listener, event_name=None, force=False): | |
""" | |
Detaches listener | |
Args: | |
listener: callback listener for event | |
event_name (str): event's name | |
force (bool): if set to True, then given listener for event will be detached | |
""" | |
# if event is wildcard, we need to iterate through each listeners | |
if not event_name or ('*' == event_name and not force): | |
for name in self.events.keys(): | |
self.detach(listener, name, True) | |
if not isinstance(event_name, str): | |
raise RuntimeError("{} expects a string for event; received {}".format( | |
self.attach.__name__, type(event_name) | |
)) | |
queue = self.events[event_name] | |
new_queue = Q.PriorityQueue() | |
size = queue.qsize() | |
while size > 0: | |
evaluated_listener = queue.get() | |
if evaluated_listener[1] != listener: | |
# put back to new queue | |
new_queue.put(evaluated_listener) | |
size -= 1 | |
queue = new_queue | |
# check if queue is empty | |
if queue.qsize() == 0: | |
del self.events[event_name] | |
def trigger_listeners(self, event): | |
""" | |
Triggers listeners of selected event | |
Args: | |
event (Event): event instance | |
Returns: | |
ResponseCollection | |
""" | |
name = event.name | |
if not name: | |
raise RuntimeError("Event is missing a name; cannot trigger") | |
# @todo some staff here | |
def clear_listeners(self, event_name): | |
""" | |
Clears all listeners for given event | |
Args: | |
event_name (str): event's name | |
""" | |
if event_name in self.events: | |
del self.events[event_name] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment