Last active
August 29, 2024 14:23
-
-
Save James-E-A/bc10dd0f4e848fe7a2eb2a4810a95de2 to your computer and use it in GitHub Desktop.
tkinter module send data with virtual event / event_generate
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ['check_for_bug', 'patch_bug', 'auto_patch_bug'] | |
import logging | |
import os | |
import pdb | |
import platform | |
import sys | |
import threading | |
import tkinter | |
FIXED_IN_CPYTHON_VERSION = (4,) # FIXME update this when they actually fix it | |
# https://tkdocs.com/tutorial/eventloop.html | |
# "If you need to communicate from another thread to the thread running | |
# Tkinter, keep it as simple as possible. Use event_generate to post a | |
# virtual event to the Tkinter event queue, and then bind to that event | |
# in your code." | |
def auto_patch_bug(module=tkinter): | |
# NOTE: check_for_bug and auto_patch_bug MUST be called from the | |
# main thread because the tkinter module refuses to allow mainloop | |
# on another thread. | |
if platform.python_implementation() == 'CPython': | |
if sys.version_info >= FIXED_IN_CPYTHON_VERSION: | |
logging.debug("No need to polyfill tkinter issue 47655 on this Python runtime.") | |
elif sys.version_info >= (3, 8): # FIXME figure out what the actual lower bound this changeset applies seamlessly on is | |
try: | |
needs_patch = check_for_bug(module) | |
except Exception: | |
raise Exception("internal error checking for bug") | |
if needs_patch: | |
patch_bug(module) | |
assert not check_for_bug(module), "Patch failed to apply!" | |
else: | |
logging.debug("No need to polyfill tkinter issue 47655 on this Python runtime.") | |
else: | |
raise Exception(f"patch not available on old CPython version {platform.python_version()}") | |
else: | |
# FIXME support other Python implementations | |
raise Exception(f"patch not available yet on {platform.python_implementation()}") | |
def check_for_bug(module=tkinter): | |
data = os.urandom(12).hex() # NOTE: the associated data is coerced to str! | |
root = module.Tk() | |
root.withdraw() | |
result = None | |
def handle_ev(event): | |
if hasattr(event, 'user_data'): | |
nonlocal result | |
result = event.user_data | |
event.widget.quit() | |
root.bind('<<test>>', handle_ev) | |
t = threading.Thread(target=root.event_generate, args=('<<test>>',), kwargs={'data': data}) | |
t.start() | |
root.mainloop() | |
if result is None: | |
# event never arrived, or was missing the .user_data property | |
return True | |
else: | |
if result != data: | |
# event arrived, had the .user_data property, but that property had an unexpected value | |
raise RuntimeError(f"Could not validate presence of CPython Bug #47655; unexpected data arrived. (Expected {data}, got {result})") | |
# event arrived, had the .user_data property, and that data was valid | |
return False | |
def patch_bug(module=tkinter): | |
def _substitute(self, *args): | |
"""https://github.com/python/cpython/pull/7142""" | |
if len(args) != len(self._subst_format): return args | |
getboolean = self.tk.getboolean | |
getint = self.tk.getint | |
def getint_event(s): | |
"""Tk changed behavior in 8.4.2, returning "??" rather more often.""" | |
try: | |
return getint(s) | |
except (ValueError, module.TclError): | |
return s | |
nsign, b, d, f, h, k, s, t, w, x, y, A, E, K, N, W, T, X, Y, D = args | |
# Missing: (a, c, m, o, v, B, R) | |
e = module.Event() | |
# serial field: valid for all events | |
# number of button: ButtonPress and ButtonRelease events only | |
# detail: for Enter, Leave, FocusIn, FocusOut and ConfigureRequest | |
# events certain fixed strings (see tcl/tk documentation) | |
# user_data: data string from a virtual event or an empty string | |
# height field: Configure, ConfigureRequest, Create, | |
# ResizeRequest, and Expose events only | |
# keycode field: KeyPress and KeyRelease events only | |
# time field: "valid for events that contain a time field" | |
# width field: Configure, ConfigureRequest, Create, ResizeRequest, | |
# and Expose events only | |
# x field: "valid for events that contain an x field" | |
# y field: "valid for events that contain a y field" | |
# keysym as decimal: KeyPress and KeyRelease events only | |
# x_root, y_root fields: ButtonPress, ButtonRelease, KeyPress, | |
# KeyRelease, and Motion events | |
e.serial = getint(nsign) | |
e.num = getint_event(b) | |
e.user_data = d | |
e.detail = d | |
try: e.focus = getboolean(f) | |
except module.TclError: pass | |
e.height = getint_event(h) | |
e.keycode = getint_event(k) | |
e.state = getint_event(s) | |
e.time = getint_event(t) | |
e.width = getint_event(w) | |
e.x = getint_event(x) | |
e.y = getint_event(y) | |
e.char = A | |
try: e.send_event = getboolean(E) | |
except module.TclError: pass | |
e.keysym = K | |
e.keysym_num = getint_event(N) | |
try: | |
e.type = module.EventType(T) | |
except ValueError: | |
e.type = T | |
try: | |
e.widget = self._nametowidget(W) | |
except KeyError: | |
e.widget = W | |
e.x_root = getint_event(X) | |
e.y_root = getint_event(Y) | |
try: | |
e.delta = getint(D) | |
except (ValueError, module.TclError): | |
e.delta = 0 | |
return (e,) | |
_subst_format = ('%#', '%b', '%d', '%f', '%h', '%k', | |
'%s', '%t', '%w', '%x', '%y', | |
'%A', '%E', '%K', '%N', '%W', '%T', '%X', '%Y', '%D') | |
_subst_format_str = " ".join(_subst_format) | |
# FIXME this seems less elegant than just assigning module.Misc, | |
# but I can't figure out how to make such an assignment propagate | |
# "retroactively" to all the subclasses like Tk and the widgets | |
module.Misc._substitute = _substitute | |
module.Misc._subst_format = _subst_format | |
module.Misc._subst_format_str = _subst_format_str | |
logging.debug(f"Patched {module.__name__}.Misc to fix CPython Bug #47655.") | |
if __name__ == '__main__': | |
auto_patch_bug() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ['receive_object', 'post_object'] | |
import logging | |
import platform | |
import threading | |
import tkinter | |
"""USAGE: | |
# on main thread | |
from tkinter_iss47655_polyfill import auto_patch_bug | |
auto_patch_bug(tkinter) | |
def handle(event): | |
foo = receive_object(event) | |
logging.info("") | |
root.bind('<<arbitrary_name>>', handle) | |
# on worker thread | |
foo = object() # could be literally any object | |
post_object(root, '<<arbitrary_name>>', foo) | |
""" | |
INFLIGHT_OBJECTS = dict() | |
INFLIGHT_OBJECTS_LOCK = threading.Lock() | |
def receive_object(event): | |
if not hasattr(event, 'user_data'): | |
if platform.python_implementation() == 'CPython' and '%d' not in event.widget._subst_format: | |
raise RuntimeError("Can't recieve data due to CPython Bug #47655. Patch this before using receive_object().") | |
raise ValueError("No data found on this event.") | |
obj_id = int(event.user_data) | |
with INFLIGHT_OBJECTS_LOCK: | |
result = INFLIGHT_OBJECTS.pop(obj_id) | |
return result | |
def post_object(widget, sequence, obj, **k): | |
obj_id = id(obj) | |
with INFLIGHT_OBJECTS_LOCK: | |
INFLIGHT_OBJECTS[obj_id] = obj | |
widget.event_generate(sequence, data=obj_id, **k) | |
widget.after(1000, _sanity_check(widget, obj_id, obj.__class__.__name__)) | |
def _sanity_check(widget, obj_id, obj_type): | |
def callback(): | |
with INFLIGHT_OBJECTS_LOCK: | |
problem = (obj_id in INFLIGHT_OBJECTS) | |
if problem: | |
logging.warning("Object %s has been inflight to %s for over a second! Is there a memory leak, or is the system just slow?", f"#{obj_id}/{obj_type}", repr(widget)) | |
widget.after(29000, _last_resort(widget, obj_id, obj_type)) | |
return callback | |
def _last_resort(widget, obj_id, obj_type): | |
def callback(): | |
try: | |
with INFLIGHT_OBJECTS_LOCK: | |
del INFLIGHT_OBJECTS[obj_id] | |
except KeyError: | |
pass | |
else: | |
logging.error("Object %s was inflight to %s for over 30 seconds! Aborted that inflight object as a last resort to stop leaking memory.", f"#{obj_id}/{obj_type}", repr(widget)) | |
return callback |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment