Skip to content

Instantly share code, notes, and snippets.

@trygveaa
Last active May 19, 2025 18:06
Show Gist options
  • Save trygveaa/e0c014725a34799238f3d511c7c0d7e7 to your computer and use it in GitHub Desktop.
Save trygveaa/e0c014725a34799238f3d511c7c0d7e7 to your computer and use it in GitHub Desktop.
Custom asyncio event loop for WeeChat
from __future__ import annotations
import asyncio
import asyncio.selector_events
import concurrent.futures
import math
import pickle
import selectors
import uuid
from collections import defaultdict, deque
from dataclasses import dataclass
from io import StringIO
from selectors import EVENT_READ, EVENT_WRITE, SelectorKey
from typing import Callable, TypeAlias
import aiohttp
import weechat
FileDescriptor: TypeAlias = int
EventMask: TypeAlias = int
WeeChatCallbackReturnType = int | str | dict[str, str] | None
WeeChatSelectorCallbackType = Callable[[SelectorKey, EventMask], None]
def _get_callback_id(callback: Callable[..., WeeChatCallbackReturnType]) -> str:
return f"{callback.__name__}-{id(callback)}"
def get_callback_name(callback: Callable[..., WeeChatCallbackReturnType]) -> str:
callback_id = _get_callback_id(callback)
globals()[callback_id] = callback
return callback_id
def remove_callback_name(callback: Callable[..., WeeChatCallbackReturnType]) -> None:
callback_id = _get_callback_id(callback)
if callback_id in globals():
del globals()[callback_id]
@dataclass
class WeeChatExecutorTask:
future: concurrent.futures.Future
callback: Callable
args: tuple
kwargs: dict
class WeeChatExecutor(concurrent.futures.Executor):
_tasks: dict[str, WeeChatExecutorTask]
_result_buffers: defaultdict[str, StringIO]
def __init__(
self,
):
super().__init__()
self._tasks = {}
self._result_buffers = defaultdict(StringIO)
def _run_executor_task(self, data: str) -> str:
try:
task_id = data
task = self._tasks[task_id]
result = task.callback(*task.args, **task.kwargs)
return pickle.dumps([True, result]).hex()
except Exception as e:
return pickle.dumps([False, e]).hex()
def _executor_process_cb(
self, data: str, command: str, return_code: int, out: str, err: str
) -> int:
task_id = data
result_buffer = self._result_buffers[task_id]
result_buffer.write(out)
if return_code == weechat.WEECHAT_HOOK_PROCESS_RUNNING:
return weechat.WEECHAT_RC_OK
task = self._tasks.pop(task_id)
if return_code != 0:
task.future.set_exception(
RuntimeError(f"Executing callable failed, code: {return_code}")
)
return weechat.WEECHAT_RC_OK
success, result = pickle.loads(bytes.fromhex(result_buffer.getvalue()))
del self._result_buffers[task_id]
if not success:
task.future.set_exception(result)
return weechat.WEECHAT_RC_OK
task.future.set_result(result)
return weechat.WEECHAT_RC_OK
def submit(self, fn, /, *args, **kwargs):
future = concurrent.futures.Future()
future.set_running_or_notify_cancel()
task_id = str(uuid.uuid4())
self._tasks[task_id] = WeeChatExecutorTask(future, fn, args, kwargs)
weechat.hook_process(
f"func:{get_callback_name(self._run_executor_task)}",
0,
get_callback_name(self._executor_process_cb),
task_id,
)
return future
def shutdown(self, wait=True, *, cancel_futures=False):
super().shutdown(wait, cancel_futures=cancel_futures)
remove_callback_name(self._run_executor_task)
remove_callback_name(self._executor_process_cb)
class WeeChatSelector(selectors._BaseSelectorImpl):
_fd_to_key: dict[FileDescriptor, SelectorKey]
_callbacks: list[WeeChatSelectorCallbackType]
_reader_hooks: dict[FileDescriptor, str]
_writer_hooks: dict[FileDescriptor, str]
def __init__(self):
super().__init__()
self._callbacks = []
self._reader_hooks = dict()
self._writer_hooks = dict()
def _reader_cb(self, data: str, fd: int) -> int:
key = self._fd_to_key.get(fd)
if key:
for callback in self._callbacks:
callback(key, EVENT_READ)
return weechat.WEECHAT_RC_OK
def _writer_cb(self, data: str, fd: int) -> int:
key = self._fd_to_key.get(fd)
if key:
for callback in self._callbacks:
callback(key, EVENT_WRITE)
return weechat.WEECHAT_RC_OK
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
if events & EVENT_READ:
self._reader_hooks[key.fd] = weechat.hook_fd(
key.fd,
1,
0,
0,
get_callback_name(self._reader_cb),
"",
)
if events & EVENT_WRITE:
self._writer_hooks[key.fd] = weechat.hook_fd(
key.fd,
0,
1,
0,
get_callback_name(self._writer_cb),
str(key.fd),
)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
reader_hook = self._reader_hooks.pop(key.fd, None)
if reader_hook:
weechat.unhook(reader_hook)
writer_hook = self._writer_hooks.pop(key.fd, None)
if writer_hook:
weechat.unhook(writer_hook)
return key
def select(self, timeout=None):
return []
def close(self):
super().close()
for hook in self._reader_hooks.values():
weechat.unhook(hook)
for hook in self._writer_hooks.values():
weechat.unhook(hook)
self._reader_hooks.clear()
self._writer_hooks.clear()
remove_callback_name(self._reader_cb)
remove_callback_name(self._writer_cb)
def add_callback(self, callback: WeeChatSelectorCallbackType):
self._callbacks.append(callback)
def remove_callback(self, callback: WeeChatSelectorCallbackType):
self._callbacks.remove(callback)
class WeeChatEventLoop(asyncio.selector_events.BaseSelectorEventLoop):
_ready: deque[asyncio.Handle]
_check_default_executor: Callable[[], None]
_process_events: Callable[[list[tuple[SelectorKey, EventMask]]], None]
_run_forever_setup: Callable[[], None]
_run_forever_cleanup: Callable[[], None]
_run_once: Callable[[], None]
def __init__(self):
selector = WeeChatSelector()
selector.add_callback(self._selector_cb)
super().__init__(selector)
def _selector_cb(self, key: SelectorKey, mask: EventMask):
self._process_events([(key, mask)])
self._run_once_if_is_running()
def _schedule_run_once_cb(self, data: str, remaining_calls: int) -> int:
self._run_once_if_is_running()
return weechat.WEECHAT_RC_OK
def _schedule_run_once(self, delay_ms: int = 0):
interval = max(1, delay_ms)
weechat.hook_timer(
interval, 0, 1, get_callback_name(self._schedule_run_once_cb), ""
)
def _run_once_if_is_running(self):
if self.is_running():
self._run_once()
def run_forever(self):
self._run_forever_setup()
self._run_once()
def run_until_complete(self, future):
raise RuntimeError("run_until_complete() is not supported")
def stop(self):
super().stop()
while self._ready:
self._run_once()
self._run_forever_cleanup()
def close(self):
super().close()
remove_callback_name(self._schedule_run_once_cb)
def call_at(self, when, callback, *args, context=None):
timer = super().call_at(when, callback, *args, context=context)
delay = timer.when() - self.time()
self._schedule_run_once(math.ceil(delay * 1000))
return timer
def call_soon(self, callback, *args, context=None):
handle = super().call_soon(callback, *args, context=context)
self._schedule_run_once()
return handle
def run_in_executor(self, executor, func, *args):
if executor is None:
executor = self._default_executor
# Only check when the default executor is being used
self._check_default_executor()
if executor is None:
executor = WeeChatExecutor()
self._default_executor = executor
return super().run_in_executor(executor, func, *args)
def set_default_executor(self, executor):
if not isinstance(executor, WeeChatExecutor):
raise TypeError("executor must be WeeChatExecutor instance")
self._default_executor = executor
def _cancel_all_tasks_start(loop):
to_cancel = asyncio.tasks.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
asyncio.tasks.gather(*to_cancel, return_exceptions=True)
return to_cancel
def _cancel_all_tasks_complete(cancelled_tasks):
for task in cancelled_tasks:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
}
)
def start_loop():
loop = WeeChatEventLoop()
asyncio.set_event_loop(loop)
loop.run_forever()
return loop
def shutdown_loop(loop: WeeChatEventLoop):
cancelled_tasks = None
try:
cancelled_tasks = _cancel_all_tasks_start(loop)
loop.create_task(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.stop()
if cancelled_tasks:
_cancel_all_tasks_complete(cancelled_tasks)
loop.close()
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/get") as resp:
print(resp.status)
print(await resp.text())
def call_command_cb(data: str, buffer: str, args: str) -> int:
loop.create_task(main())
return weechat.WEECHAT_RC_OK
def shutdown_cb():
shutdown_loop(loop)
return weechat.WEECHAT_RC_OK
if weechat.register(
"loop", "trygveaa", "1.0", "MIT", "", get_callback_name(shutdown_cb), ""
):
loop = start_loop()
hook = weechat.hook_command(
"call", "", "", "", "", get_callback_name(call_command_cb), ""
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment