Last active
December 14, 2024 02:20
-
-
Save ynkdir/bc68b4b639b0414a31c0388fc2b2a5ef to your computer and use it in GitHub Desktop.
Connecting Asyncio and Win32 event loops
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# dependencies = ["win32more"] | |
# /// | |
import asyncio | |
import threading | |
from concurrent.futures import Future | |
from queue import Queue | |
from win32more import WinError | |
from win32more.Windows.Win32.System.LibraryLoader import GetModuleHandle | |
from win32more.Windows.Win32.UI.WindowsAndMessaging import ( | |
HWND_MESSAGE, | |
MSG, | |
WNDCLASS, | |
CreateWindowEx, | |
DefWindowProc, | |
DispatchMessage, | |
GetMessage, | |
PostMessage, | |
PostQuitMessage, | |
RegisterClass, | |
TranslateMessage, | |
) | |
class GuestEventLoop(asyncio.EventLoop): | |
def __init__(self, host): | |
super().__init__() | |
self._host = host | |
self._thread = threading.Thread(target=self._run_forever_thread) | |
def run_forever(self): | |
# Runner will call run_until_complete() while closing. | |
if self._thread is None: | |
return super().run_forever() | |
asyncio.events._set_running_loop(self) | |
self._thread.start() | |
self._host.mainloop() | |
self._thread.join() | |
self._thread = None | |
asyncio.events._set_running_loop(None) | |
def _run_forever_thread(self): | |
super().run_forever() | |
self._host.dispatch(self._host.quit) | |
def _dispatch_and_wait(self, callback, *args): | |
def wrapper(): | |
try: | |
future.set_result(callback(*args)) | |
except Exception as e: | |
future.set_exception(e) | |
future = Future() | |
self._host.dispatch(wrapper) | |
return future.result() | |
def stop(self): | |
super().stop() | |
self._write_to_self() # wakeup loop | |
# dispatch model of the following code | |
# | |
# UI thread Asyncio thread | |
# | |
# dispatch | |
# +-------+<--------------+------------+ | |
# | event | | poll | | |
# +-------+-------------->| | | |
# | | | |
# dispatch | | | |
# +-------+<--------------| | | |
# | event | | | | |
# +-------+-------------->+------------+ | |
def call_later(self, delay, callback, *args, context=None): | |
return super().call_later(delay, self._wrap(callback), *args, context=context) | |
def call_at(self, when, callback, *args, context=None): | |
return super().call_at(when, self._wrap(callback), *args, context=context) | |
def call_soon(self, callback, *args, context=None): | |
return super().call_soon(self._wrap(callback), *args, context=context) | |
def call_soon_threadsafe(self, callback, *args, context=None): | |
return super().call_soon_threadsafe(self._wrap(callback), *args, context=context) | |
def _wrap(self, callback): | |
# callback can be wrapped already. | |
if hasattr(callback, "wrapped"): | |
return callback | |
def wrapper(*args): | |
if self._thread is None: | |
return callback(*args) | |
return self._dispatch_and_wait(callback, *args) | |
wrapper.wrapped = True | |
return wrapper | |
# UI thread Asyncio thread | |
# | |
# +------------+ | |
# dispatch | wait | | |
# +-------+<--------------+------------+ | |
# |process| | |
# |events | | |
# +-------+-------------->+------------+ | |
# | wait | | |
# +------------+ | |
# | |
# # to implement this dispatch model, | |
# # copy whole code of _run_once() from asyncio/base_events.py | |
# # and extract __wait_events() and __process_events() | |
# def _run_once(self): | |
# if self._thread is None: | |
# return super()._run_once() | |
# self.__wait_events() | |
# self._dispatcher.dispatch(self.__process_events).result() | |
class Win32Dispatcher: | |
WM_CALLBACK = 0x8000 | |
def __init__(self): | |
self._hwnd = self._create_window() | |
self._q = Queue() | |
def _create_window(self): | |
CLASS_NAME = "Win32Dispatcher" | |
hInstance = GetModuleHandle(None) | |
wc = WNDCLASS() | |
wc.lpfnWndProc = self._wndproc | |
wc.hInstance = hInstance | |
wc.lpszClassName = CLASS_NAME | |
atom = RegisterClass(wc) | |
if atom == 0: | |
raise WinError() | |
hwnd = CreateWindowEx(0, CLASS_NAME, "", 0, 0, 0, 0, 0, HWND_MESSAGE, 0, hInstance, 0) | |
if not hwnd: | |
raise WinError() | |
return hwnd | |
def _wndproc(self, hwnd, uMsg, wParam, lParam): | |
if uMsg == self.WM_CALLBACK: | |
self._q.get()() | |
return DefWindowProc(hwnd, uMsg, wParam, lParam) | |
def dispatch(self, callback, *args): | |
self._q.put(lambda: callback(*args)) | |
PostMessage(self._hwnd, self.WM_CALLBACK, 0, 0) | |
class App: | |
def __init__(self): | |
self._dispatcher = Win32Dispatcher() | |
def mainloop(self): | |
msg = MSG() | |
while GetMessage(msg, 0, 0, 0) > 0: | |
TranslateMessage(msg) | |
DispatchMessage(msg) | |
def quit(self): | |
PostQuitMessage(0) | |
def dispatch(self, callback, *args): | |
self._dispatcher.dispatch(callback, *args) | |
# To close application safely, stop asyncio loop first. | |
def close(self): | |
asyncio.get_running_loop().stop() | |
async def heavytask(self): | |
print(f"heavytask ({threading.get_native_id()}): start") | |
for i in range(5): | |
await asyncio.sleep(0.5) | |
print(f"heavytask ({threading.get_native_id()}): {i}") | |
print(f"heavytask ({threading.get_native_id()}): end") | |
def main(): | |
app = App() | |
asyncio.run(app.heavytask(), loop_factory=lambda: GuestEventLoop(app)) | |
# or when using gui widget | |
# GuestEventLoop(app).run_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment