Skip to content

Instantly share code, notes, and snippets.

@James-E-A
Last active June 25, 2025 17:46
Show Gist options
  • Save James-E-A/ad3c746f8c12332dc26b428d611bd3bd to your computer and use it in GitHub Desktop.
Save James-E-A/ad3c746f8c12332dc26b428d611bd3bd to your computer and use it in GitHub Desktop.
simple tkinter multithreaded comunication
import queue
import logging
from tkinter import TclError
import weakref
__all__ = ['data_bind']
def data_bind(widget, callback, *, pass_tk_event=False, timeout=0.0, catastrophic_timeout=5.0, queueing_mode='auto'):
"""Foolproof, thread-safe workaround for https://github.com/python/cpython/issues/47655
https://tkdocs.com/tutorial/eventloop.html
> If you need to communicate from another thread to the thread running
> Tkinter, keep it as simple as possible. Use event_generate to post a
> virtual event to the Tkinter event queue, and then bind to that event
> in your code.
**NOTE:** does not work around https://github.com/pypy/pypy/issues/5066
# Example Usage
import tkinter as tk
from simple_tkinter_threaded_helper import data_bind
root = tk.Tk()
widget = root.SomeWidget(...)
# 1. The callback **WILL** only be called by the tk mainloop,
# so it's OK to access the widget from here!
def my_handler(sent_object, tk_event):
# (if you prefer to store the widget in a closure,
# omit `pass_tk_event` when calling `data_bind`)
tk_event.widget.do_stuff_with(sent_object)
# 2. This is an **EXAMPLE** of a decorator to convert a generator-based worker
# into a callback-based worker. Obviously, you could define whatever
# worker interface you want to use instead of using this dummy code.
import collections, functools, time
@(lambda g: lambda f: functools.wraps(f)(lambda *a, _yield, **k: g(map(_yield, f(*a, **k)))))(lambda it: collections.deque(it, 0) or None)
def my_worker(n=3):
for i in range(n):
time.sleep(1.0)
yield f"Sent Object #{i+1}"
# 3. It's OK to call `send(some_object)` from **ANY** thread,
# including background/daemon/worker threads!
send = data_bind(widget, my_handler, pass_tk_event=True)
worker_thread = threading.Thread(target=my_worker, kwargs={'_yield': send}, daemon=True)
root.after(1, lambda t: t.start(), worker_thread)
root.mainloop()
https://ko-fi.com/E_Administrator
"""
if not pass_tk_event:
callback = (lambda f: lambda x, _y: f(x))(callback)
if (catastrophic_timeout is not None) and (timeout is not None):
effective_catastrophic_timeout = max(catastrophic_timeout - timeout, 0.0)
else:
effective_catastrophic_timeout = catastrophic_timeout
if queueing_mode == 'auto':
queueing_mode = 'tk_first' if timeout > 0 else 'queue_first'
q = queue.SimpleQueue()
sequence = f"<<data_{id(q)}>>"
# e.g. <<data_12345678900000>> means that the queue with Python object ID 12345678900000 has data
# We do not allow passing any keyword arguments through to event_generate
# because that would inflict either a race condition (if multiple threads
# call send() at the same time, it might be unclear which kwarg set was
# supposed to be paired with which sent item), or a synchronization burden
# on the senders to avoid that race condition --- not to mention the burden
# of ensuring that our queue doesn't slip against the tkinter event loop
# if-or-when the caller passes parameters that cause event_generate to fail.
# If the caller wants to include extra data with their sent object, they can
# just INCLUDE IT in their own dadgum sent object interface, which is opaque to us.
if queueing_mode == 'queue_first':
# Prioritize not stuttering the event loop over letting other events outrace this one.
def send(item, /):
# in worker thread / any thread.
q.put(item)
widget.event_generate(sequence)
elif queueing_mode == 'tk_first':
# Prioritize not letting events outrace this one over not stuttering the event loop.
def send(item, /):
# in worker thread / any thread.
widget.event_generate(sequence)
q.put(item)
else:
raise ValueError(f"queueing_mode={queueing_mode!r}")
def tk_callback(event, _attempt=0):
# in tkinter mainloop thread.
if _attempt == 0:
try:
item = q.get(timeout=timeout)
except queue.Empty:
# it seems that widget.event_generate slightly outraced the propagation of q.put;
# by default, let's defer committing the tkinter mainloop to a blocking call to q.get,
# which could cause other pending performance-intensive events to stutter,
# till after the event loop exhausts.
logging.warning("propagation of %s.put lagging behind handling of %s.event_generate", q, event.widget)
event.widget.after_idle(tk_callback, event, _attempt + 1)
return
else:
# dawg theres just no way SimpleQueue gonna legitimately block for FIVE FREAKIN SECONDS,
# in the single-consumer case,
# if there's data that's ALREADY BEEN SENT on another thread...
# https://github.com/python/cpython/blob/v3.13.1/Lib/queue.py#L336
# https://github.com/python/cpython/blob/v3.13.1/Modules/_queuemodule.c#L312
# https://github.com/python/cpython/blob/v3.13.1/Lib/multiprocessing/queues.py#L93
# https://github.com/python/cpython/blob/v3.13.1/Modules/_collectionsmodule.c#L375
# https://github.com/pypy/pypy/blob/release-pypy3.10-v7.3.12/lib-python/3/queue.py#L9
# ...right??
item = q.get(timeout=effective_catastrophic_timeout)
callback(item, event)
widget.bind(sequence, tk_callback)
# widget -> {sequence binding} -> tk_callback -> q
# would create a memory leak if the caller, for some reason,
# creates many / unbounded bindings
# which it meant to have lifetimes much shorter than the widget itself
weakref.finalize(send, _finalize, widget, sequence, tk_callback, q)
return send
def _finalize(widget, sequence, to_unbind, q, _phase=0):
try:
if _phase == 0:
# potentially on GC thread, or on a worker thread running synchronously immediately after a call to send().
# to address both of these possibilities, transfer execution to tkinter mainloop thread and wait for the event loop to fully spin once.
widget.after_idle(_finalize, widget, sequence, to_unbind, q, _phase + 1)
elif _phase == 1:
# send will never be called again;
# event loop has fully exhausted;
# and we are executing on tkinter mainloop thread.
# OK to remove binding.
widget.unbind(sequence, to_unbind)
if not q.empty():
# The event loop is empty;
# the application is alive (since unbind didn't raise);
# yet our queue is not empty, mainloop didn't take ownership of all sent objects.
# I cannot imagine any way that this might actually occur;
# if this block is SOMEHOW reached, there is a bug in my code.
logging.warning("Dead queue %s somehow failed to drain after a complete event loop spin; unbinding callback %s from widget %s anyway. Please file a bug report at https://gist.github.com/ad3c746f8c12332dc26b428d611bd3bd#comments", sequence, to_unbind, widget)
except TclError:
# Application was already destroyed so binding no longer exists; no need to unbind
pass
def _demo():
# Simple Demo
import datetime, collections, functools, threading, time, tkinter as tk
@(lambda g: lambda f: functools.wraps(f)(lambda *a, _yield, **k: g(map(_yield, f(*a, **k)))))(lambda it: collections.deque(it, 0) or None)
def worker(flags=[True]):
# EXAMPLE: generate data, slowly, on another thread
while True:
time.sleep(1.0)
yield f"Hello from {threading.current_thread()} at {datetime.datetime.now()}!"
def handle(result, event):
# EXAMPLE: Render data in GUI, on main thread
event.widget.insert("1.0", f"{event}: {result}\n")
root = tk.Tk()
body = tk.Text(root)
body.pack(fill=tk.BOTH, expand=True)
send = data_bind(body, handle, pass_tk_event=True)
worker = threading.Thread(target=worker, kwargs={'_yield': send}, daemon=True)
root.after(1, lambda t: t.start(), worker)
root.mainloop()
if __name__ == '__main__':
_demo()
from collections.abc import Callable
from typing import Annotated, Literal, TypeVar, Union, overload
import tkinter
T = TypeVar("T")
Widget = Union[tkinter.Widget, tkinter.Tk]
@overload
def data_bind(
widget: Widget,
callback: Callable[[T]], *,
pass_tk_event: Literal[False] = ...,
timeout: Optional[float] = ...,
catastrophic_timeout: Optional[float] = ...,
queueing_mode: Literal['auto', 'tk_first', 'queue_first'] = ...
) -> Callable[[T]]:
...
@overload
def data_bind(
widget: Widget,
callback: Callable[[T, tkinter.Event]], *,
pass_tk_event: Literal[True],
timeout: Optional[float] = ...,
catastrophic_timeout: Optional[float] = ...,
queueing_mode: Literal['auto', 'tk_first', 'queue_first'] = ...
) -> Callable[[T]]:
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment