Last active
May 19, 2025 18:06
-
-
Save trygveaa/e0c014725a34799238f3d511c7c0d7e7 to your computer and use it in GitHub Desktop.
Custom asyncio event loop for WeeChat
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import asyncio | |
import asyncio.selector_events | |
import concurrent.futures | |
import math | |
import pickle | |
import selectors | |
import uuid | |
from collections import defaultdict, deque | |
from dataclasses import dataclass | |
from io import StringIO | |
from selectors import EVENT_READ, EVENT_WRITE, SelectorKey | |
from typing import Callable, TypeAlias | |
import aiohttp | |
import weechat | |
FileDescriptor: TypeAlias = int | |
EventMask: TypeAlias = int | |
WeeChatCallbackReturnType = int | str | dict[str, str] | None | |
WeeChatSelectorCallbackType = Callable[[SelectorKey, EventMask], None] | |
def _get_callback_id(callback: Callable[..., WeeChatCallbackReturnType]) -> str: | |
return f"{callback.__name__}-{id(callback)}" | |
def get_callback_name(callback: Callable[..., WeeChatCallbackReturnType]) -> str: | |
callback_id = _get_callback_id(callback) | |
globals()[callback_id] = callback | |
return callback_id | |
def remove_callback_name(callback: Callable[..., WeeChatCallbackReturnType]) -> None: | |
callback_id = _get_callback_id(callback) | |
if callback_id in globals(): | |
del globals()[callback_id] | |
@dataclass | |
class WeeChatExecutorTask: | |
future: concurrent.futures.Future | |
callback: Callable | |
args: tuple | |
kwargs: dict | |
class WeeChatExecutor(concurrent.futures.Executor): | |
_tasks: dict[str, WeeChatExecutorTask] | |
_result_buffers: defaultdict[str, StringIO] | |
def __init__( | |
self, | |
): | |
super().__init__() | |
self._tasks = {} | |
self._result_buffers = defaultdict(StringIO) | |
def _run_executor_task(self, data: str) -> str: | |
try: | |
task_id = data | |
task = self._tasks[task_id] | |
result = task.callback(*task.args, **task.kwargs) | |
return pickle.dumps([True, result]).hex() | |
except Exception as e: | |
return pickle.dumps([False, e]).hex() | |
def _executor_process_cb( | |
self, data: str, command: str, return_code: int, out: str, err: str | |
) -> int: | |
task_id = data | |
result_buffer = self._result_buffers[task_id] | |
result_buffer.write(out) | |
if return_code == weechat.WEECHAT_HOOK_PROCESS_RUNNING: | |
return weechat.WEECHAT_RC_OK | |
task = self._tasks.pop(task_id) | |
if return_code != 0: | |
task.future.set_exception( | |
RuntimeError(f"Executing callable failed, code: {return_code}") | |
) | |
return weechat.WEECHAT_RC_OK | |
success, result = pickle.loads(bytes.fromhex(result_buffer.getvalue())) | |
del self._result_buffers[task_id] | |
if not success: | |
task.future.set_exception(result) | |
return weechat.WEECHAT_RC_OK | |
task.future.set_result(result) | |
return weechat.WEECHAT_RC_OK | |
def submit(self, fn, /, *args, **kwargs): | |
future = concurrent.futures.Future() | |
future.set_running_or_notify_cancel() | |
task_id = str(uuid.uuid4()) | |
self._tasks[task_id] = WeeChatExecutorTask(future, fn, args, kwargs) | |
weechat.hook_process( | |
f"func:{get_callback_name(self._run_executor_task)}", | |
0, | |
get_callback_name(self._executor_process_cb), | |
task_id, | |
) | |
return future | |
def shutdown(self, wait=True, *, cancel_futures=False): | |
super().shutdown(wait, cancel_futures=cancel_futures) | |
remove_callback_name(self._run_executor_task) | |
remove_callback_name(self._executor_process_cb) | |
class WeeChatSelector(selectors._BaseSelectorImpl): | |
_fd_to_key: dict[FileDescriptor, SelectorKey] | |
_callbacks: list[WeeChatSelectorCallbackType] | |
_reader_hooks: dict[FileDescriptor, str] | |
_writer_hooks: dict[FileDescriptor, str] | |
def __init__(self): | |
super().__init__() | |
self._callbacks = [] | |
self._reader_hooks = dict() | |
self._writer_hooks = dict() | |
def _reader_cb(self, data: str, fd: int) -> int: | |
key = self._fd_to_key.get(fd) | |
if key: | |
for callback in self._callbacks: | |
callback(key, EVENT_READ) | |
return weechat.WEECHAT_RC_OK | |
def _writer_cb(self, data: str, fd: int) -> int: | |
key = self._fd_to_key.get(fd) | |
if key: | |
for callback in self._callbacks: | |
callback(key, EVENT_WRITE) | |
return weechat.WEECHAT_RC_OK | |
def register(self, fileobj, events, data=None): | |
key = super().register(fileobj, events, data) | |
if events & EVENT_READ: | |
self._reader_hooks[key.fd] = weechat.hook_fd( | |
key.fd, | |
1, | |
0, | |
0, | |
get_callback_name(self._reader_cb), | |
"", | |
) | |
if events & EVENT_WRITE: | |
self._writer_hooks[key.fd] = weechat.hook_fd( | |
key.fd, | |
0, | |
1, | |
0, | |
get_callback_name(self._writer_cb), | |
str(key.fd), | |
) | |
return key | |
def unregister(self, fileobj): | |
key = super().unregister(fileobj) | |
reader_hook = self._reader_hooks.pop(key.fd, None) | |
if reader_hook: | |
weechat.unhook(reader_hook) | |
writer_hook = self._writer_hooks.pop(key.fd, None) | |
if writer_hook: | |
weechat.unhook(writer_hook) | |
return key | |
def select(self, timeout=None): | |
return [] | |
def close(self): | |
super().close() | |
for hook in self._reader_hooks.values(): | |
weechat.unhook(hook) | |
for hook in self._writer_hooks.values(): | |
weechat.unhook(hook) | |
self._reader_hooks.clear() | |
self._writer_hooks.clear() | |
remove_callback_name(self._reader_cb) | |
remove_callback_name(self._writer_cb) | |
def add_callback(self, callback: WeeChatSelectorCallbackType): | |
self._callbacks.append(callback) | |
def remove_callback(self, callback: WeeChatSelectorCallbackType): | |
self._callbacks.remove(callback) | |
class WeeChatEventLoop(asyncio.selector_events.BaseSelectorEventLoop): | |
_ready: deque[asyncio.Handle] | |
_check_default_executor: Callable[[], None] | |
_process_events: Callable[[list[tuple[SelectorKey, EventMask]]], None] | |
_run_forever_setup: Callable[[], None] | |
_run_forever_cleanup: Callable[[], None] | |
_run_once: Callable[[], None] | |
def __init__(self): | |
selector = WeeChatSelector() | |
selector.add_callback(self._selector_cb) | |
super().__init__(selector) | |
def _selector_cb(self, key: SelectorKey, mask: EventMask): | |
self._process_events([(key, mask)]) | |
self._run_once_if_is_running() | |
def _schedule_run_once_cb(self, data: str, remaining_calls: int) -> int: | |
self._run_once_if_is_running() | |
return weechat.WEECHAT_RC_OK | |
def _schedule_run_once(self, delay_ms: int = 0): | |
interval = max(1, delay_ms) | |
weechat.hook_timer( | |
interval, 0, 1, get_callback_name(self._schedule_run_once_cb), "" | |
) | |
def _run_once_if_is_running(self): | |
if self.is_running(): | |
self._run_once() | |
def run_forever(self): | |
self._run_forever_setup() | |
self._run_once() | |
def run_until_complete(self, future): | |
raise RuntimeError("run_until_complete() is not supported") | |
def stop(self): | |
super().stop() | |
while self._ready: | |
self._run_once() | |
self._run_forever_cleanup() | |
def close(self): | |
super().close() | |
remove_callback_name(self._schedule_run_once_cb) | |
def call_at(self, when, callback, *args, context=None): | |
timer = super().call_at(when, callback, *args, context=context) | |
delay = timer.when() - self.time() | |
self._schedule_run_once(math.ceil(delay * 1000)) | |
return timer | |
def call_soon(self, callback, *args, context=None): | |
handle = super().call_soon(callback, *args, context=context) | |
self._schedule_run_once() | |
return handle | |
def run_in_executor(self, executor, func, *args): | |
if executor is None: | |
executor = self._default_executor | |
# Only check when the default executor is being used | |
self._check_default_executor() | |
if executor is None: | |
executor = WeeChatExecutor() | |
self._default_executor = executor | |
return super().run_in_executor(executor, func, *args) | |
def set_default_executor(self, executor): | |
if not isinstance(executor, WeeChatExecutor): | |
raise TypeError("executor must be WeeChatExecutor instance") | |
self._default_executor = executor | |
def _cancel_all_tasks_start(loop): | |
to_cancel = asyncio.tasks.all_tasks(loop) | |
if not to_cancel: | |
return | |
for task in to_cancel: | |
task.cancel() | |
asyncio.tasks.gather(*to_cancel, return_exceptions=True) | |
return to_cancel | |
def _cancel_all_tasks_complete(cancelled_tasks): | |
for task in cancelled_tasks: | |
if task.cancelled(): | |
continue | |
if task.exception() is not None: | |
loop.call_exception_handler( | |
{ | |
"message": "unhandled exception during asyncio.run() shutdown", | |
"exception": task.exception(), | |
"task": task, | |
} | |
) | |
def start_loop(): | |
loop = WeeChatEventLoop() | |
asyncio.set_event_loop(loop) | |
loop.run_forever() | |
return loop | |
def shutdown_loop(loop: WeeChatEventLoop): | |
cancelled_tasks = None | |
try: | |
cancelled_tasks = _cancel_all_tasks_start(loop) | |
loop.create_task(loop.shutdown_asyncgens()) | |
finally: | |
asyncio.set_event_loop(None) | |
loop.stop() | |
if cancelled_tasks: | |
_cancel_all_tasks_complete(cancelled_tasks) | |
loop.close() | |
async def main(): | |
async with aiohttp.ClientSession() as session: | |
async with session.get("https://httpbin.org/get") as resp: | |
print(resp.status) | |
print(await resp.text()) | |
def call_command_cb(data: str, buffer: str, args: str) -> int: | |
loop.create_task(main()) | |
return weechat.WEECHAT_RC_OK | |
def shutdown_cb(): | |
shutdown_loop(loop) | |
return weechat.WEECHAT_RC_OK | |
if weechat.register( | |
"loop", "trygveaa", "1.0", "MIT", "", get_callback_name(shutdown_cb), "" | |
): | |
loop = start_loop() | |
hook = weechat.hook_command( | |
"call", "", "", "", "", get_callback_name(call_command_cb), "" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment