Last active
October 22, 2024 01:49
-
-
Save Moosems/82b89ff6a93c4132f9fb2ca6260ae125 to your computer and use it in GitHub Desktop.
Python Event based Loop Dispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Implements an event dispatch loop similar to the mainloop in tkinter. The EventDispatcher is NOT asyncronous but should be thread safe.""" | |
# TODO: async variant, test thread safety | |
from collections import deque | |
from logging import Logger, getLogger | |
from queue import PriorityQueue | |
from random import randint | |
from time import sleep, time_ns | |
from beartype.typing import Any, Callable, Optional | |
binding_func = Callable[[str, Optional[Any]], Any] | |
def auto_terminate(ed: "EventDispatcher") -> None: | |
ed.terminated = True | |
class EstesioneError(Exception): ... | |
class EventDispatcher: | |
"""An event based loop that dispatches callbacks on events. | |
The EventDispatcher class implements the following methods for its public API: | |
- EventDispatcher.queue_is_empty() | |
- EventDispatcher.bind(command: str, func: Callable[[str, Optional[Any]], Any]) -> int | |
- EventDispatcher.unbind(command: str, func_id: int = 0) | |
- EventDispatcher.broadcast(command: str, data: Any = __broadcast_data_default) | |
- EventDispatcher.immediate(command: str, data: Any = __broadcast_data_default) | |
- EventDispatcher.schedule(time_in_ms: int, command: str, data: Any = __broadcast_data_default) | |
- EventDispatcher.run() | |
There are a few attributes that can be modified to alter behavior. Notably: | |
- EventDispatcher.terminated (bool (defaults to False) - Wehether or not to end the loop) | |
- EventDispatcher.throttling (bool (defaults to True) - Whether to throttle the loop to save cpu usage) | |
- EventDispatcher.cycles_per_second (int (defaults to 120) - The number of cycles per second; overriden by throttling) | |
- EventDispatcher.func_id_max (int (defaults to 15_000) - The number of funcs that can be bound to) | |
""" | |
# Used to determine if the user is sending data or not | |
class __broadcast_data_default: ... | |
def __init__( | |
self, | |
loop_func: Callable[["EventDispatcher"], None] = auto_terminate, | |
throttling: bool = True, | |
cycles_per_second: int = 120, | |
func_id_max: int = 15_000, | |
) -> None: | |
self._terminated: bool = False | |
self._throttling: bool = True | |
self._cycles_per_second: int = cycles_per_second | |
self._loop_func: Callable[["EventDispatcher"], None] = loop_func | |
self._event_queue: deque[tuple[str, Any]] = deque() | |
self._time_queue: PriorityQueue[tuple[int, str, Any]] = PriorityQueue() | |
self._bindings: dict[str, list[tuple[int, binding_func]]] = {} | |
self._func_ids: list[int] = [] | |
self._func_id_max: int = func_id_max | |
self._logger: Logger = getLogger("EventDispatcher") | |
self.sleep_period: float = 1 / self._cycles_per_second | |
@property | |
def throttling(self) -> bool: | |
"""Returns wether or not throttling is turned on""" | |
return self._throttling | |
@throttling.setter | |
def throttling(self, throttle: bool) -> None: | |
"""Sets the throttling""" | |
self._throttling = throttle | |
@property | |
def terminated(self) -> bool: | |
"""Returns wether or not the loop has been terminated""" | |
return self._terminated | |
@terminated.setter | |
def terminated(self, terminate: bool) -> None: | |
"""Can revive or kill the loop""" | |
self._terminated = terminate | |
@property | |
def cycles_per_second(self) -> int: | |
"""Returns how many cycles occur (when able) per second""" | |
return self._cycles_per_second | |
@cycles_per_second.setter | |
def cycles_per_second(self, cycles: int) -> None: | |
"""Modifies the cycles per second""" | |
self._cycles_per_second = cycles | |
self.sleep_period = 1 / cycles | |
@property | |
def func_id_max(self) -> int: | |
"""Returns how many functions can be bound at once""" | |
return self._func_id_max | |
@func_id_max.setter | |
def func_id_max(self, id_max: int) -> None: | |
"""Modifies the max function count""" | |
self._func_id_max = id_max | |
def queue_is_empty(self) -> bool: | |
"""Checks that the event and time queues are both empty - External API""" | |
return not len(self._event_queue) and self._time_queue.empty() | |
def _get_func_id(self) -> int: | |
"""Returns a unique function id - Internal API""" | |
# In cases where there are many many requests being sent it may be faster to choose a | |
# random id than to iterate through the list of id's and find an unclaimed one | |
# NOTE: 0 is reserved for when there's no curent id's (self.current_ids) | |
id: int = randint(1, self._func_id_max) | |
while id in self._func_ids: | |
id = randint(1, self._func_id_max) | |
self._func_ids.append(id) | |
return id | |
def bind(self, event: str, func: binding_func) -> int: | |
"""Binds a function to an event string that can be called anywhere at any time - External API""" | |
new_id: int | |
if event in self._bindings: | |
new_id = self._get_func_id() | |
self._bindings[event].append((new_id, func)) | |
else: | |
new_id = self._get_func_id() | |
self._bindings[event] = [(new_id, func)] | |
self._logger.debug(f"Event {event} has been bound") | |
return new_id | |
def unbind(self, event: str, func_id: int = 0) -> None: | |
"""Binds the function bound to the event string given - External API""" | |
if event not in self._bindings: | |
self._logger.exception(f"Event str {event} has not yet been bound") | |
raise EstesioneError(f"Event str {event} is not yet bound") | |
if not func_id: | |
event_funcs = self._bindings.pop(event) | |
for event_func_tuple in event_funcs: | |
self._func_ids.remove(event_func_tuple[0]) | |
self._logger.debug(f"Event {event} has been unbound") | |
return | |
func_list: list[tuple[int, binding_func]] = self._bindings[event] | |
found: bool = False | |
for i, data in enumerate(func_list): | |
if data[0] == func_id: | |
_ = func_list.remove(data) | |
self._func_ids.remove(func_id) | |
found = True | |
break | |
if not found: | |
self._logger.exception( | |
f"Event {event} has no func with id {func_id}" | |
) | |
raise EstesioneError( | |
f"Event {event} has no func with id {func_id}" | |
) | |
self._logger.debug(f"Event of funcid {func_id} has been unbound") | |
def broadcast( | |
self, | |
event: str, | |
data: Any = __broadcast_data_default, | |
priority: bool = False, | |
) -> None: | |
"""Adds the event requested to the back of the queue with the data requested, if any - External API""" | |
if event not in self._bindings: | |
self._logger.exception(f"Event {event} is not in bindings") | |
raise EstesioneError(f"Event {event} is not in bindings") | |
if not priority: | |
self._event_queue.appendleft((event, data)) | |
else: | |
self._event_queue.append((event, data)) | |
self._logger.debug(f"Command {event} has been broadcasted") | |
def immediate( | |
self, event: str, data: Any = __broadcast_data_default | |
) -> None: | |
"""Calls the command requested to the front of the queue with the data requested, if any - External API""" | |
if event not in self._bindings: | |
self._logger.exception(f"Event {event} is not in bindings") | |
raise EstesioneError(f"Event {event} is not in bindings") | |
real_func: list[tuple[int, binding_func]] = self._bindings[event] | |
for id, func in real_func: | |
if data is not EventDispatcher.__broadcast_data_default: | |
func(event, data) # type: ignore | |
else: | |
func(event) # type: ignore | |
self._logger.debug( | |
f"Event {event} called. Data {data} sent to all bound functions" | |
) | |
def schedule( | |
self, | |
time_in_ms: int, | |
event: str, | |
data: Any = __broadcast_data_default, | |
) -> None: | |
"""Adds the command requested to the time queue. Will be called as close to the time requested as possible - External API""" | |
# TODO: after cancel, after id | |
if event not in self._bindings: | |
self._logger.exception(f"Event {event} is not in bindings") | |
raise EstesioneError(f"Event {event} is not in bindings") | |
current_time: int = int(time_ns() / 1_000_000) | |
final_time: int = current_time + time_in_ms | |
self._time_queue.put((final_time, event, data)) | |
self._logger.debug( | |
f"Command {event} added to be run in {time_in_ms} ms" | |
) | |
def _check_after_queue(self) -> None: | |
"""Searches for items in the after loop to run - Internal API""" | |
current_time: int = int(time_ns() / 1_000_000) | |
# TODO: Find a way to theoretically stay thread safe but sort by time to save time | |
# reinserting values | |
unused_values: list[tuple[int, str, Any]] = [] | |
while not self._time_queue.empty(): | |
time_event: tuple[int, str, Any] = self._time_queue.get() | |
call_time, event, data = time_event | |
if current_time < call_time: | |
unused_values.append(time_event) | |
continue | |
self.immediate(event, data) | |
for unused_value in unused_values: | |
self._time_queue.put(unused_value) | |
def _empty_queue(self) -> None: | |
"""Runs items in the event loop - Internal API""" | |
while len(self._event_queue): | |
event: tuple[str, Any] = self._event_queue.pop() | |
self.immediate(*event) | |
def run(self) -> None: | |
"""Runs the main loop - External API""" | |
while not self._terminated: | |
self._loop_func(self) | |
self._empty_queue() | |
self._check_after_queue() | |
if self._throttling: | |
sleep(self.sleep_period) | |
# The after queue check happens again because we want the sleep | |
# in between when the command is broadcasted and checked. It makes | |
# it more likely to hit a short time period call and is negligible | |
# for long ones | |
self._check_after_queue() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
from beartype.typing import Any | |
from event_dispatcher import EstesioneError, EventDispatcher | |
def test_dispatcher(): | |
# ============Tests============ | |
callback_index = 0 | |
def callback(bind: str, data: Any | None = None) -> None: | |
nonlocal callback_index | |
if callback_index == 0: | |
assert data is None | |
if callback_index == 1: | |
assert data is None | |
elif callback_index == 2: | |
assert data == "1st" | |
elif callback_index == 3: | |
assert data == "2nd" | |
callback_index += 1 | |
i = 0 | |
def loop_func(ed: EventDispatcher) -> None: | |
nonlocal i | |
i += 1 | |
if i == 10: | |
ed.broadcast("Test") | |
ed.immediate("Test") | |
ed.schedule(20, "Test", "2nd") | |
ed.schedule(10, "Test", "1st") | |
ed.terminated = True | |
sleep(0.02) | |
ed = EventDispatcher(loop_func) | |
ed.bind("Test", callback) | |
id = ed.bind("Test2", callback) | |
failed = True | |
try: | |
ed.unbind("Test", func_id=id) | |
failed = False | |
except EstesioneError: | |
pass | |
assert failed | |
ed.unbind("Test2", id) | |
assert ed._bindings["Test2"] == [] | |
ed.unbind("Test2") | |
ed.unbind("Test") | |
assert ed._bindings == {} | |
failed = True | |
try: | |
ed.broadcast("Test") | |
failed = False | |
except EstesioneError: | |
pass | |
assert failed | |
ed.bind("Test", callback) | |
ed.run() | |
assert callback_index == 4 | |
assert ed.queue_is_empty() | |
assert i == 10 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment