Last active
June 25, 2025 17:46
-
-
Save James-E-A/ad3c746f8c12332dc26b428d611bd3bd to your computer and use it in GitHub Desktop.
simple tkinter multithreaded comunication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import queue | |
import logging | |
from tkinter import TclError | |
import weakref | |
__all__ = ['data_bind'] | |
def data_bind(widget, callback, *, pass_tk_event=False, timeout=0.0, catastrophic_timeout=5.0, queueing_mode='auto'): | |
"""Foolproof, thread-safe workaround for https://github.com/python/cpython/issues/47655 | |
https://tkdocs.com/tutorial/eventloop.html | |
> If you need to communicate from another thread to the thread running | |
> Tkinter, keep it as simple as possible. Use event_generate to post a | |
> virtual event to the Tkinter event queue, and then bind to that event | |
> in your code. | |
**NOTE:** does not work around https://github.com/pypy/pypy/issues/5066 | |
# Example Usage | |
import tkinter as tk | |
from simple_tkinter_threaded_helper import data_bind | |
root = tk.Tk() | |
widget = root.SomeWidget(...) | |
# 1. The callback **WILL** only be called by the tk mainloop, | |
# so it's OK to access the widget from here! | |
def my_handler(sent_object, tk_event): | |
# (if you prefer to store the widget in a closure, | |
# omit `pass_tk_event` when calling `data_bind`) | |
tk_event.widget.do_stuff_with(sent_object) | |
# 2. This is an **EXAMPLE** of a decorator to convert a generator-based worker | |
# into a callback-based worker. Obviously, you could define whatever | |
# worker interface you want to use instead of using this dummy code. | |
import collections, functools, time | |
@(lambda g: lambda f: functools.wraps(f)(lambda *a, _yield, **k: g(map(_yield, f(*a, **k)))))(lambda it: collections.deque(it, 0) or None) | |
def my_worker(n=3): | |
for i in range(n): | |
time.sleep(1.0) | |
yield f"Sent Object #{i+1}" | |
# 3. It's OK to call `send(some_object)` from **ANY** thread, | |
# including background/daemon/worker threads! | |
send = data_bind(widget, my_handler, pass_tk_event=True) | |
worker_thread = threading.Thread(target=my_worker, kwargs={'_yield': send}, daemon=True) | |
root.after(1, lambda t: t.start(), worker_thread) | |
root.mainloop() | |
https://ko-fi.com/E_Administrator | |
""" | |
if not pass_tk_event: | |
callback = (lambda f: lambda x, _y: f(x))(callback) | |
if (catastrophic_timeout is not None) and (timeout is not None): | |
effective_catastrophic_timeout = max(catastrophic_timeout - timeout, 0.0) | |
else: | |
effective_catastrophic_timeout = catastrophic_timeout | |
if queueing_mode == 'auto': | |
queueing_mode = 'tk_first' if timeout > 0 else 'queue_first' | |
q = queue.SimpleQueue() | |
sequence = f"<<data_{id(q)}>>" | |
# e.g. <<data_12345678900000>> means that the queue with Python object ID 12345678900000 has data | |
# We do not allow passing any keyword arguments through to event_generate | |
# because that would inflict either a race condition (if multiple threads | |
# call send() at the same time, it might be unclear which kwarg set was | |
# supposed to be paired with which sent item), or a synchronization burden | |
# on the senders to avoid that race condition --- not to mention the burden | |
# of ensuring that our queue doesn't slip against the tkinter event loop | |
# if-or-when the caller passes parameters that cause event_generate to fail. | |
# If the caller wants to include extra data with their sent object, they can | |
# just INCLUDE IT in their own dadgum sent object interface, which is opaque to us. | |
if queueing_mode == 'queue_first': | |
# Prioritize not stuttering the event loop over letting other events outrace this one. | |
def send(item, /): | |
# in worker thread / any thread. | |
q.put(item) | |
widget.event_generate(sequence) | |
elif queueing_mode == 'tk_first': | |
# Prioritize not letting events outrace this one over not stuttering the event loop. | |
def send(item, /): | |
# in worker thread / any thread. | |
widget.event_generate(sequence) | |
q.put(item) | |
else: | |
raise ValueError(f"queueing_mode={queueing_mode!r}") | |
def tk_callback(event, _attempt=0): | |
# in tkinter mainloop thread. | |
if _attempt == 0: | |
try: | |
item = q.get(timeout=timeout) | |
except queue.Empty: | |
# it seems that widget.event_generate slightly outraced the propagation of q.put; | |
# by default, let's defer committing the tkinter mainloop to a blocking call to q.get, | |
# which could cause other pending performance-intensive events to stutter, | |
# till after the event loop exhausts. | |
logging.warning("propagation of %s.put lagging behind handling of %s.event_generate", q, event.widget) | |
event.widget.after_idle(tk_callback, event, _attempt + 1) | |
return | |
else: | |
# dawg theres just no way SimpleQueue gonna legitimately block for FIVE FREAKIN SECONDS, | |
# in the single-consumer case, | |
# if there's data that's ALREADY BEEN SENT on another thread... | |
# https://github.com/python/cpython/blob/v3.13.1/Lib/queue.py#L336 | |
# https://github.com/python/cpython/blob/v3.13.1/Modules/_queuemodule.c#L312 | |
# https://github.com/python/cpython/blob/v3.13.1/Lib/multiprocessing/queues.py#L93 | |
# https://github.com/python/cpython/blob/v3.13.1/Modules/_collectionsmodule.c#L375 | |
# https://github.com/pypy/pypy/blob/release-pypy3.10-v7.3.12/lib-python/3/queue.py#L9 | |
# ...right?? | |
item = q.get(timeout=effective_catastrophic_timeout) | |
callback(item, event) | |
widget.bind(sequence, tk_callback) | |
# widget -> {sequence binding} -> tk_callback -> q | |
# would create a memory leak if the caller, for some reason, | |
# creates many / unbounded bindings | |
# which it meant to have lifetimes much shorter than the widget itself | |
weakref.finalize(send, _finalize, widget, sequence, tk_callback, q) | |
return send | |
def _finalize(widget, sequence, to_unbind, q, _phase=0): | |
try: | |
if _phase == 0: | |
# potentially on GC thread, or on a worker thread running synchronously immediately after a call to send(). | |
# to address both of these possibilities, transfer execution to tkinter mainloop thread and wait for the event loop to fully spin once. | |
widget.after_idle(_finalize, widget, sequence, to_unbind, q, _phase + 1) | |
elif _phase == 1: | |
# send will never be called again; | |
# event loop has fully exhausted; | |
# and we are executing on tkinter mainloop thread. | |
# OK to remove binding. | |
widget.unbind(sequence, to_unbind) | |
if not q.empty(): | |
# The event loop is empty; | |
# the application is alive (since unbind didn't raise); | |
# yet our queue is not empty, mainloop didn't take ownership of all sent objects. | |
# I cannot imagine any way that this might actually occur; | |
# if this block is SOMEHOW reached, there is a bug in my code. | |
logging.warning("Dead queue %s somehow failed to drain after a complete event loop spin; unbinding callback %s from widget %s anyway. Please file a bug report at https://gist.github.com/ad3c746f8c12332dc26b428d611bd3bd#comments", sequence, to_unbind, widget) | |
except TclError: | |
# Application was already destroyed so binding no longer exists; no need to unbind | |
pass | |
def _demo(): | |
# Simple Demo | |
import datetime, collections, functools, threading, time, tkinter as tk | |
@(lambda g: lambda f: functools.wraps(f)(lambda *a, _yield, **k: g(map(_yield, f(*a, **k)))))(lambda it: collections.deque(it, 0) or None) | |
def worker(flags=[True]): | |
# EXAMPLE: generate data, slowly, on another thread | |
while True: | |
time.sleep(1.0) | |
yield f"Hello from {threading.current_thread()} at {datetime.datetime.now()}!" | |
def handle(result, event): | |
# EXAMPLE: Render data in GUI, on main thread | |
event.widget.insert("1.0", f"{event}: {result}\n") | |
root = tk.Tk() | |
body = tk.Text(root) | |
body.pack(fill=tk.BOTH, expand=True) | |
send = data_bind(body, handle, pass_tk_event=True) | |
worker = threading.Thread(target=worker, kwargs={'_yield': send}, daemon=True) | |
root.after(1, lambda t: t.start(), worker) | |
root.mainloop() | |
if __name__ == '__main__': | |
_demo() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import Callable | |
from typing import Annotated, Literal, TypeVar, Union, overload | |
import tkinter | |
T = TypeVar("T") | |
Widget = Union[tkinter.Widget, tkinter.Tk] | |
@overload | |
def data_bind( | |
widget: Widget, | |
callback: Callable[[T]], *, | |
pass_tk_event: Literal[False] = ..., | |
timeout: Optional[float] = ..., | |
catastrophic_timeout: Optional[float] = ..., | |
queueing_mode: Literal['auto', 'tk_first', 'queue_first'] = ... | |
) -> Callable[[T]]: | |
... | |
@overload | |
def data_bind( | |
widget: Widget, | |
callback: Callable[[T, tkinter.Event]], *, | |
pass_tk_event: Literal[True], | |
timeout: Optional[float] = ..., | |
catastrophic_timeout: Optional[float] = ..., | |
queueing_mode: Literal['auto', 'tk_first', 'queue_first'] = ... | |
) -> Callable[[T]]: | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment