Skip to content

Instantly share code, notes, and snippets.

@ynkdir
Last active December 20, 2024 15:16
Show Gist options
  • Save ynkdir/fec4195c0232a85051753ef44ad68c88 to your computer and use it in GitHub Desktop.
Save ynkdir/fec4195c0232a85051753ef44ad68c88 to your computer and use it in GitHub Desktop.
Connecting Asyncio and Tkinter event loops
# Asyncio guest mode
#
# [Using “guest mode” to run Trio on top of other event loops](https://trio.readthedocs.io/en/stable/reference-lowlevel.html#using-guest-mode-to-run-trio-on-top-of-other-event-loops)
#
#
# Trio's guest mode model
#
# UI thread Asyncio thread
#
# +------------+
# dispatch | wait |
# +-------+<--------------+------------+
# |process|
# |events |
# +-------+-------------->+------------+
# | wait |
# +------------+
import asyncio
import heapq
import threading
import tkinter as tk
from concurrent.futures import Future
def start_guest_run(async_fn=None, *args, run_sync_soon_threadsafe, done_callback):
def setup():
asyncio.events._set_running_loop(loop)
def cleanup():
asyncio.events._set_running_loop(None)
done_callback()
def run():
if async_fn is not None:
loop.run_until_complete(async_fn(*args))
else:
loop.run_forever()
run_sync_soon_threadsafe(cleanup)
loop = GuestEventLoop(run_sync_soon_threadsafe)
setup()
threading.Thread(target=run).start()
class GuestEventLoop(asyncio.EventLoop):
def __init__(self, dispatch):
super().__init__()
self._dispatch = dispatch
def _dispatch_and_wait(self, callback, *args):
def wrapper():
try:
future.set_result(callback(*args))
except Exception as e:
future.set_exception(e)
future = Future()
self._dispatch(wrapper)
return future.result()
def stop(self):
super().stop()
self._write_to_self() # wakeup loop
# copied whole code from asyncio/base_events.py and extracted __wait_events() and __process_events()
def _run_once(self):
self.__wait_events()
self._dispatch_and_wait(self.__process_events)
def __wait_events(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (
sched_count > asyncio.base_events._MIN_SCHEDULED_TIMER_HANDLES
and self._timer_cancelled_count / sched_count > asyncio.base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION
):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
if timeout > asyncio.base_events.MAXIMUM_SELECT_TIMEOUT:
timeout = asyncio.base_events.MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Needed to break cycles when an exception occurs.
event_list = None
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
def __process_events(self):
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
asyncio.base_events.logger.warning(
"Executing %s took %.3f seconds", asyncio.base_events._format_handle(handle), dt
)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
class App(tk.Tk):
def __init__(self):
super().__init__()
# To close application safely, stop asyncio loop first. Don't use destroy() and quit()
self.protocol("WM_DELETE_WINDOW", lambda: asyncio.get_running_loop().stop())
self.button = tk.Button(
text="Run Heavy Task",
command=lambda: asyncio.run_coroutine_threadsafe(self.heavytask(), asyncio.get_running_loop()),
)
self.button.pack()
self.label = tk.Label(text="heavy task")
self.label.pack()
async def heavytask(self):
print(f"heavytask ({threading.get_native_id()})")
self.label["text"] = "start"
for i in range(3):
await asyncio.sleep(1)
self.label["text"] = i
self.label["text"] = "end"
def main():
print(f"main ({threading.get_native_id()})")
app = App()
start_guest_run(run_sync_soon_threadsafe=app.after_idle, done_callback=app.quit)
app.mainloop()
if __name__ == "__main__":
main()
# Connecting Asyncio and Tkinter event loops
#
# [Connecting Asyncio and Tkinter event loops?](https://discuss.python.org/t/connecting-asyncio-and-tkinter-event-loops/14722)
# [feat: add UV_LOOP_INTERRUPT_ON_IO_CHANGE option to uv_loop_configure #3308](https://github.com/libuv/libuv/pull/3308)
# [Integrate tkinter and asyncio (and async)](https://github.com/python/cpython/issues/71733)
# [Using “guest mode” to run Trio on top of other event loops](https://trio.readthedocs.io/en/stable/reference-lowlevel.html#using-guest-mode-to-run-trio-on-top-of-other-event-loops)
#
#
# Each event dispatch model
#
# UI thread Asyncio thread
#
# dispatch
# +-------+<--------------+------------+
# | event | | poll |
# +-------+-------------->| |
# | |
# dispatch | |
# +-------+<--------------| |
# | event | | |
# +-------+-------------->+------------+
#
#
# Manual dispatch model
#
# UI thread Asyncio thread
#
# dispatch
# +-------+<--------------+------------+
# |ui code| | coroutine |
# +-------+-------------->| |
# | |
# dispatch | |
# +-------+<--------------| |
# |ui code| | |
# +-------+-------------->+------------+
#
#
# Trio's guest mode model
#
# UI thread Asyncio thread
#
# +------------+
# dispatch | wait |
# +-------+<--------------+------------+
# |process|
# |events |
# +-------+-------------->+------------+
# | wait |
# +------------+
#
#
# This script implements each event dispatch model. It is inefficient compared
# to Trio's dispatch model. But It should be enough for GUI application.
#
# Most GUI frameworks has dispatch method. We can dispatch ui code with it
# manually.
#
# Tkinter dispatch widget call internally. Perhaps all we need is just run
# asyncio in new thread.
import asyncio
import threading
import tkinter as tk
from concurrent.futures import Future
class EventDispatchEventLoop(asyncio.EventLoop):
def __init__(self, host):
super().__init__()
self._host = host
self._thread = threading.Thread(target=self._run_forever_thread)
def run_forever(self):
# Runner will call run_until_complete() while closing.
if self._thread is None:
return super().run_forever()
asyncio.events._set_running_loop(self)
self._thread.start()
self._host.mainloop()
self._thread.join()
self._thread = None
asyncio.events._set_running_loop(None)
def _run_forever_thread(self):
super().run_forever()
self._host.after_idle(self._host.quit)
def _dispatch_and_wait(self, callback, *args):
def wrapper():
try:
future.set_result(callback(*args))
except Exception as e:
future.set_exception(e)
future = Future()
self._host.after_idle(wrapper)
return future.result()
def stop(self):
super().stop()
self._write_to_self() # wakeup loop
def call_later(self, delay, callback, *args, context=None):
return super().call_later(delay, self._wrap(callback), *args, context=context)
def call_at(self, when, callback, *args, context=None):
return super().call_at(when, self._wrap(callback), *args, context=context)
def call_soon(self, callback, *args, context=None):
return super().call_soon(self._wrap(callback), *args, context=context)
def call_soon_threadsafe(self, callback, *args, context=None):
return super().call_soon_threadsafe(self._wrap(callback), *args, context=context)
def _wrap(self, callback):
# callback can be wrapped already.
if hasattr(callback, "wrapped"):
return callback
def wrapper(*args):
if self._thread is None:
return callback(*args)
return self._dispatch_and_wait(callback, *args)
wrapper.wrapped = True
return wrapper
class App(tk.Tk):
def __init__(self):
super().__init__()
# To close application safely, stop asyncio loop first. Don't use destroy() and quit()
self.protocol("WM_DELETE_WINDOW", lambda: asyncio.get_running_loop().stop())
self.button = tk.Button(
text="Run Heavy Task",
command=lambda: asyncio.run_coroutine_threadsafe(self.heavytask(), asyncio.get_running_loop()),
)
self.button.pack()
self.label = tk.Label(text="heavy task")
self.label.pack()
async def heavytask(self):
print(f"heavytask ({threading.get_native_id()})")
self.label["text"] = "start"
for i in range(3):
await asyncio.sleep(1)
self.label["text"] = i
self.label["text"] = "end"
def main():
print(f"main ({threading.get_native_id()})")
app = App()
asyncio.run(app.heavytask(), loop_factory=lambda: EventDispatchEventLoop(app))
# or
# EventDispatchEventLoop(app).run_forever()
if __name__ == "__main__":
main()
# Timer version
#
# I think that polling with timer is not so bad for GUI application.
import asyncio
import tkinter as tk
def start_timer_run(interval_timer):
loop = asyncio.new_event_loop()
loop.stop()
loop._run_forever_setup()
interval_timer(loop._run_once)
class App(tk.Tk):
def __init__(self):
super().__init__()
self.button = tk.Button(
text="Run Heavy Task",
command=lambda: asyncio.run_coroutine_threadsafe(self.heavytask(), asyncio.get_running_loop()),
)
self.button.pack()
self.label = tk.Label(text="heavy task")
self.label.pack()
async def heavytask(self):
self.label["text"] = "start"
for i in range(3):
await asyncio.sleep(1)
self.label["text"] = i
self.label["text"] = "end"
def main():
def interval_timer(callback):
callback()
app.after(100, interval_timer, callback)
app = App()
start_timer_run(interval_timer)
app.mainloop()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment