Skip to content

Instantly share code, notes, and snippets.

@ynkdir
Last active December 14, 2024 02:20
Show Gist options
  • Save ynkdir/bc68b4b639b0414a31c0388fc2b2a5ef to your computer and use it in GitHub Desktop.
Save ynkdir/bc68b4b639b0414a31c0388fc2b2a5ef to your computer and use it in GitHub Desktop.
Connecting Asyncio and Win32 event loops
# /// script
# dependencies = ["win32more"]
# ///
import asyncio
import threading
from concurrent.futures import Future
from queue import Queue
from win32more import WinError
from win32more.Windows.Win32.System.LibraryLoader import GetModuleHandle
from win32more.Windows.Win32.UI.WindowsAndMessaging import (
HWND_MESSAGE,
MSG,
WNDCLASS,
CreateWindowEx,
DefWindowProc,
DispatchMessage,
GetMessage,
PostMessage,
PostQuitMessage,
RegisterClass,
TranslateMessage,
)
class GuestEventLoop(asyncio.EventLoop):
def __init__(self, host):
super().__init__()
self._host = host
self._thread = threading.Thread(target=self._run_forever_thread)
def run_forever(self):
# Runner will call run_until_complete() while closing.
if self._thread is None:
return super().run_forever()
asyncio.events._set_running_loop(self)
self._thread.start()
self._host.mainloop()
self._thread.join()
self._thread = None
asyncio.events._set_running_loop(None)
def _run_forever_thread(self):
super().run_forever()
self._host.dispatch(self._host.quit)
def _dispatch_and_wait(self, callback, *args):
def wrapper():
try:
future.set_result(callback(*args))
except Exception as e:
future.set_exception(e)
future = Future()
self._host.dispatch(wrapper)
return future.result()
def stop(self):
super().stop()
self._write_to_self() # wakeup loop
# dispatch model of the following code
#
# UI thread Asyncio thread
#
# dispatch
# +-------+<--------------+------------+
# | event | | poll |
# +-------+-------------->| |
# | |
# dispatch | |
# +-------+<--------------| |
# | event | | |
# +-------+-------------->+------------+
def call_later(self, delay, callback, *args, context=None):
return super().call_later(delay, self._wrap(callback), *args, context=context)
def call_at(self, when, callback, *args, context=None):
return super().call_at(when, self._wrap(callback), *args, context=context)
def call_soon(self, callback, *args, context=None):
return super().call_soon(self._wrap(callback), *args, context=context)
def call_soon_threadsafe(self, callback, *args, context=None):
return super().call_soon_threadsafe(self._wrap(callback), *args, context=context)
def _wrap(self, callback):
# callback can be wrapped already.
if hasattr(callback, "wrapped"):
return callback
def wrapper(*args):
if self._thread is None:
return callback(*args)
return self._dispatch_and_wait(callback, *args)
wrapper.wrapped = True
return wrapper
# UI thread Asyncio thread
#
# +------------+
# dispatch | wait |
# +-------+<--------------+------------+
# |process|
# |events |
# +-------+-------------->+------------+
# | wait |
# +------------+
#
# # to implement this dispatch model,
# # copy whole code of _run_once() from asyncio/base_events.py
# # and extract __wait_events() and __process_events()
# def _run_once(self):
# if self._thread is None:
# return super()._run_once()
# self.__wait_events()
# self._dispatcher.dispatch(self.__process_events).result()
class Win32Dispatcher:
WM_CALLBACK = 0x8000
def __init__(self):
self._hwnd = self._create_window()
self._q = Queue()
def _create_window(self):
CLASS_NAME = "Win32Dispatcher"
hInstance = GetModuleHandle(None)
wc = WNDCLASS()
wc.lpfnWndProc = self._wndproc
wc.hInstance = hInstance
wc.lpszClassName = CLASS_NAME
atom = RegisterClass(wc)
if atom == 0:
raise WinError()
hwnd = CreateWindowEx(0, CLASS_NAME, "", 0, 0, 0, 0, 0, HWND_MESSAGE, 0, hInstance, 0)
if not hwnd:
raise WinError()
return hwnd
def _wndproc(self, hwnd, uMsg, wParam, lParam):
if uMsg == self.WM_CALLBACK:
self._q.get()()
return DefWindowProc(hwnd, uMsg, wParam, lParam)
def dispatch(self, callback, *args):
self._q.put(lambda: callback(*args))
PostMessage(self._hwnd, self.WM_CALLBACK, 0, 0)
class App:
def __init__(self):
self._dispatcher = Win32Dispatcher()
def mainloop(self):
msg = MSG()
while GetMessage(msg, 0, 0, 0) > 0:
TranslateMessage(msg)
DispatchMessage(msg)
def quit(self):
PostQuitMessage(0)
def dispatch(self, callback, *args):
self._dispatcher.dispatch(callback, *args)
# To close application safely, stop asyncio loop first.
def close(self):
asyncio.get_running_loop().stop()
async def heavytask(self):
print(f"heavytask ({threading.get_native_id()}): start")
for i in range(5):
await asyncio.sleep(0.5)
print(f"heavytask ({threading.get_native_id()}): {i}")
print(f"heavytask ({threading.get_native_id()}): end")
def main():
app = App()
asyncio.run(app.heavytask(), loop_factory=lambda: GuestEventLoop(app))
# or when using gui widget
# GuestEventLoop(app).run_forever()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment