Skip to content

Instantly share code, notes, and snippets.

@Moosems
Last active October 22, 2024 01:49
Show Gist options
  • Save Moosems/82b89ff6a93c4132f9fb2ca6260ae125 to your computer and use it in GitHub Desktop.
Save Moosems/82b89ff6a93c4132f9fb2ca6260ae125 to your computer and use it in GitHub Desktop.
Python Event based Loop Dispatcher
"""Implements an event dispatch loop similar to the mainloop in tkinter. The EventDispatcher is NOT asyncronous but should be thread safe."""
# TODO: async variant, test thread safety
from collections import deque
from logging import Logger, getLogger
from queue import PriorityQueue
from random import randint
from time import sleep, time_ns
from beartype.typing import Any, Callable, Optional
binding_func = Callable[[str, Optional[Any]], Any]
def auto_terminate(ed: "EventDispatcher") -> None:
ed.terminated = True
class EstesioneError(Exception): ...
class EventDispatcher:
"""An event based loop that dispatches callbacks on events.
The EventDispatcher class implements the following methods for its public API:
- EventDispatcher.queue_is_empty()
- EventDispatcher.bind(command: str, func: Callable[[str, Optional[Any]], Any]) -> int
- EventDispatcher.unbind(command: str, func_id: int = 0)
- EventDispatcher.broadcast(command: str, data: Any = __broadcast_data_default)
- EventDispatcher.immediate(command: str, data: Any = __broadcast_data_default)
- EventDispatcher.schedule(time_in_ms: int, command: str, data: Any = __broadcast_data_default)
- EventDispatcher.run()
There are a few attributes that can be modified to alter behavior. Notably:
- EventDispatcher.terminated (bool (defaults to False) - Wehether or not to end the loop)
- EventDispatcher.throttling (bool (defaults to True) - Whether to throttle the loop to save cpu usage)
- EventDispatcher.cycles_per_second (int (defaults to 120) - The number of cycles per second; overriden by throttling)
- EventDispatcher.func_id_max (int (defaults to 15_000) - The number of funcs that can be bound to)
"""
# Used to determine if the user is sending data or not
class __broadcast_data_default: ...
def __init__(
self,
loop_func: Callable[["EventDispatcher"], None] = auto_terminate,
throttling: bool = True,
cycles_per_second: int = 120,
func_id_max: int = 15_000,
) -> None:
self._terminated: bool = False
self._throttling: bool = True
self._cycles_per_second: int = cycles_per_second
self._loop_func: Callable[["EventDispatcher"], None] = loop_func
self._event_queue: deque[tuple[str, Any]] = deque()
self._time_queue: PriorityQueue[tuple[int, str, Any]] = PriorityQueue()
self._bindings: dict[str, list[tuple[int, binding_func]]] = {}
self._func_ids: list[int] = []
self._func_id_max: int = func_id_max
self._logger: Logger = getLogger("EventDispatcher")
self.sleep_period: float = 1 / self._cycles_per_second
@property
def throttling(self) -> bool:
"""Returns wether or not throttling is turned on"""
return self._throttling
@throttling.setter
def throttling(self, throttle: bool) -> None:
"""Sets the throttling"""
self._throttling = throttle
@property
def terminated(self) -> bool:
"""Returns wether or not the loop has been terminated"""
return self._terminated
@terminated.setter
def terminated(self, terminate: bool) -> None:
"""Can revive or kill the loop"""
self._terminated = terminate
@property
def cycles_per_second(self) -> int:
"""Returns how many cycles occur (when able) per second"""
return self._cycles_per_second
@cycles_per_second.setter
def cycles_per_second(self, cycles: int) -> None:
"""Modifies the cycles per second"""
self._cycles_per_second = cycles
self.sleep_period = 1 / cycles
@property
def func_id_max(self) -> int:
"""Returns how many functions can be bound at once"""
return self._func_id_max
@func_id_max.setter
def func_id_max(self, id_max: int) -> None:
"""Modifies the max function count"""
self._func_id_max = id_max
def queue_is_empty(self) -> bool:
"""Checks that the event and time queues are both empty - External API"""
return not len(self._event_queue) and self._time_queue.empty()
def _get_func_id(self) -> int:
"""Returns a unique function id - Internal API"""
# In cases where there are many many requests being sent it may be faster to choose a
# random id than to iterate through the list of id's and find an unclaimed one
# NOTE: 0 is reserved for when there's no curent id's (self.current_ids)
id: int = randint(1, self._func_id_max)
while id in self._func_ids:
id = randint(1, self._func_id_max)
self._func_ids.append(id)
return id
def bind(self, event: str, func: binding_func) -> int:
"""Binds a function to an event string that can be called anywhere at any time - External API"""
new_id: int
if event in self._bindings:
new_id = self._get_func_id()
self._bindings[event].append((new_id, func))
else:
new_id = self._get_func_id()
self._bindings[event] = [(new_id, func)]
self._logger.debug(f"Event {event} has been bound")
return new_id
def unbind(self, event: str, func_id: int = 0) -> None:
"""Binds the function bound to the event string given - External API"""
if event not in self._bindings:
self._logger.exception(f"Event str {event} has not yet been bound")
raise EstesioneError(f"Event str {event} is not yet bound")
if not func_id:
event_funcs = self._bindings.pop(event)
for event_func_tuple in event_funcs:
self._func_ids.remove(event_func_tuple[0])
self._logger.debug(f"Event {event} has been unbound")
return
func_list: list[tuple[int, binding_func]] = self._bindings[event]
found: bool = False
for i, data in enumerate(func_list):
if data[0] == func_id:
_ = func_list.remove(data)
self._func_ids.remove(func_id)
found = True
break
if not found:
self._logger.exception(
f"Event {event} has no func with id {func_id}"
)
raise EstesioneError(
f"Event {event} has no func with id {func_id}"
)
self._logger.debug(f"Event of funcid {func_id} has been unbound")
def broadcast(
self,
event: str,
data: Any = __broadcast_data_default,
priority: bool = False,
) -> None:
"""Adds the event requested to the back of the queue with the data requested, if any - External API"""
if event not in self._bindings:
self._logger.exception(f"Event {event} is not in bindings")
raise EstesioneError(f"Event {event} is not in bindings")
if not priority:
self._event_queue.appendleft((event, data))
else:
self._event_queue.append((event, data))
self._logger.debug(f"Command {event} has been broadcasted")
def immediate(
self, event: str, data: Any = __broadcast_data_default
) -> None:
"""Calls the command requested to the front of the queue with the data requested, if any - External API"""
if event not in self._bindings:
self._logger.exception(f"Event {event} is not in bindings")
raise EstesioneError(f"Event {event} is not in bindings")
real_func: list[tuple[int, binding_func]] = self._bindings[event]
for id, func in real_func:
if data is not EventDispatcher.__broadcast_data_default:
func(event, data) # type: ignore
else:
func(event) # type: ignore
self._logger.debug(
f"Event {event} called. Data {data} sent to all bound functions"
)
def schedule(
self,
time_in_ms: int,
event: str,
data: Any = __broadcast_data_default,
) -> None:
"""Adds the command requested to the time queue. Will be called as close to the time requested as possible - External API"""
# TODO: after cancel, after id
if event not in self._bindings:
self._logger.exception(f"Event {event} is not in bindings")
raise EstesioneError(f"Event {event} is not in bindings")
current_time: int = int(time_ns() / 1_000_000)
final_time: int = current_time + time_in_ms
self._time_queue.put((final_time, event, data))
self._logger.debug(
f"Command {event} added to be run in {time_in_ms} ms"
)
def _check_after_queue(self) -> None:
"""Searches for items in the after loop to run - Internal API"""
current_time: int = int(time_ns() / 1_000_000)
# TODO: Find a way to theoretically stay thread safe but sort by time to save time
# reinserting values
unused_values: list[tuple[int, str, Any]] = []
while not self._time_queue.empty():
time_event: tuple[int, str, Any] = self._time_queue.get()
call_time, event, data = time_event
if current_time < call_time:
unused_values.append(time_event)
continue
self.immediate(event, data)
for unused_value in unused_values:
self._time_queue.put(unused_value)
def _empty_queue(self) -> None:
"""Runs items in the event loop - Internal API"""
while len(self._event_queue):
event: tuple[str, Any] = self._event_queue.pop()
self.immediate(*event)
def run(self) -> None:
"""Runs the main loop - External API"""
while not self._terminated:
self._loop_func(self)
self._empty_queue()
self._check_after_queue()
if self._throttling:
sleep(self.sleep_period)
# The after queue check happens again because we want the sleep
# in between when the command is broadcasted and checked. It makes
# it more likely to hit a short time period call and is negligible
# for long ones
self._check_after_queue()
from time import sleep
from beartype.typing import Any
from event_dispatcher import EstesioneError, EventDispatcher
def test_dispatcher():
# ============Tests============
callback_index = 0
def callback(bind: str, data: Any | None = None) -> None:
nonlocal callback_index
if callback_index == 0:
assert data is None
if callback_index == 1:
assert data is None
elif callback_index == 2:
assert data == "1st"
elif callback_index == 3:
assert data == "2nd"
callback_index += 1
i = 0
def loop_func(ed: EventDispatcher) -> None:
nonlocal i
i += 1
if i == 10:
ed.broadcast("Test")
ed.immediate("Test")
ed.schedule(20, "Test", "2nd")
ed.schedule(10, "Test", "1st")
ed.terminated = True
sleep(0.02)
ed = EventDispatcher(loop_func)
ed.bind("Test", callback)
id = ed.bind("Test2", callback)
failed = True
try:
ed.unbind("Test", func_id=id)
failed = False
except EstesioneError:
pass
assert failed
ed.unbind("Test2", id)
assert ed._bindings["Test2"] == []
ed.unbind("Test2")
ed.unbind("Test")
assert ed._bindings == {}
failed = True
try:
ed.broadcast("Test")
failed = False
except EstesioneError:
pass
assert failed
ed.bind("Test", callback)
ed.run()
assert callback_index == 4
assert ed.queue_is_empty()
assert i == 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment