Last active
March 28, 2024 21:46
-
-
Save minimalefforttech/877b4d5179e0f51e987a502080f26310 to your computer and use it in GitHub Desktop.
A simple example of running code on separate threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (C) 2024 Alex Telford | |
# http://minimaleffort.tech | |
# This work is licensed under the Creative Commons Attribution 4.0 International License. | |
# To view a copy of this license, visit http://creativecommons.org/licenses/by/4.0/ or send a letter to Creative Commons, | |
# PO Box 1866, Mountain View, CA 94042, USA. | |
# Distributed without any warranty or liability, use at your own risk | |
# This is an example of deffering code using Qt event loops | |
try: | |
from PySide6 import QtCore, QtWidgets | |
except ImportError: | |
from PySide2 import QtCore, QtWidgets | |
import traceback | |
class _Worker(QtCore.QRunnable, QtCore.QObject): | |
""" This is a simple runnable, Qt will move it to the appropriate thread in the pool | |
Args: | |
callable: Tuple(Function, Args[], Kwargs{}) | |
""" | |
finished = QtCore.Signal(str, "QVariant", arguments=["uuid", "result"]) | |
failed = QtCore.Signal(str, str, str, arguments=["uuid", "error", "traceback"]) | |
def __init__(self, callable): | |
QtCore.QRunnable.__init__(self) | |
QtCore.QObject.__init__(self) | |
self._callable = callable | |
self.setAutoDelete(True) | |
# sender() will not work across thread boundary so we must track by uuid | |
self._uid = QtCore.QUuid.createUuid() | |
@QtCore.Slot(result=str) | |
def uid(self): | |
# A unique identifier for this worker | |
return self._uid.toString() | |
def run(self): | |
""" Runs the code and handles any exceptions across the thread boundary | |
""" | |
try: | |
function, args, kwargs = self._callable | |
result = function(*args, **kwargs) | |
self.finished.emit(self.uid(), result) | |
except Exception as e: | |
# exceptions will not make it over thread boundary | |
traceback.print_exc() | |
self.failed.emit(self.uid(), str(e), traceback.format_exc()) | |
class ThreadManager(QtCore.QObject): | |
""" A simple wrapper to dump functions to a threadpool for later retrieval | |
Args: | |
timeout[int]: time out after msecs | |
Usage: | |
with ThreadManager as threads: | |
uid = threads.run(function, arg1, arg2) | |
info = threads.info(uid) | |
if info["success]: | |
value = info["result"] | |
else: | |
print(info["traceback"]) | |
""" | |
finished = QtCore.Signal(bool, arguments=["success"]) | |
worker_finished = QtCore.Signal(str, bool, arguments=["uuid", "success"]) | |
def __init__(self, timeout=-1): | |
super(ThreadManager, self).__init__() | |
self._pool = QtCore.QThreadPool(self) | |
self._timeout = timeout | |
self._count = 0 | |
self._results = {} | |
def run(self, callable, *args, **kwargs): | |
"""Add a callable to the stack and return a reference uid | |
Args: | |
callable (Callable): function to call | |
*args: args to pass to callable | |
**kwargs: kwargs to pass to callable | |
Returns: | |
str: uuid | |
""" | |
worker = _Worker([callable, args, kwargs]) | |
worker.finished.connect(self._worker_finished) | |
worker.failed.connect(self._worker_failed) | |
started = self._pool.tryStart(worker) | |
self._count += 1 | |
if not started: | |
worker.run() # Unlikely event the pool could not reserve a thread | |
return worker.uid() | |
def __enter__(self): | |
return self | |
def __exit__(self, tb_type, tb_traceback, tb_value): | |
if not self._count: | |
return | |
if not self._pool.waitForDone(self._timeout): | |
raise RuntimeError("Timed Out") | |
success = all(v["success"] for v in self._results.values()) | |
if not success: | |
raise RuntimeError("An exception occured") | |
@QtCore.Slot(str, result=bool) | |
@QtCore.Slot(str, int, result=bool) | |
def wait(self, uuid, msecs=-1): | |
""" Wait for the uid to finish | |
Args: | |
uuid(str) | |
msecs(int) | |
""" | |
if uuid in self._results: | |
return True | |
timer = None | |
if msecs > 0: | |
timer = QtCore.QTimer() | |
timer.setSingleShot(True) | |
timer.timeout.connect(loop.quit) | |
timer.start(msecs) | |
loop = QtCore.QEventLoop() | |
self.worker_finished.connect(loop.quit) | |
while True: | |
if timer and not timer.isActive(): | |
return False | |
if uuid in self._results: | |
return True | |
if hasattr(loop, "exec"): | |
loop.exec() | |
else: | |
loop.exec_() | |
@QtCore.Slot(str, result="QVariant") | |
@QtCore.Slot(str, int, result="QVariant") | |
def info(self, uuid, msecs=-1): | |
"""Get the full result info for the uuid | |
Args: | |
uuid (str): uuid to query | |
msecs (int): optional timeout | |
Raises: | |
RuntimeError: if timedout | |
Returns: | |
dict{ | |
"success": completed without errors | |
"result": return value | |
"message": exception message | |
"traceback": formated traceback | |
} | |
""" | |
if not self.wait(uuid, msecs): | |
raise RuntimeError("Timed out") | |
return self._results[uuid] | |
@QtCore.Slot(str, result="QVariant") | |
@QtCore.Slot(str, int, result="QVariant") | |
def result(self, uuid, msecs=-1): | |
"""Get the return value of the callable by its uid | |
Args: | |
uuid (str): uuid to query | |
msecs (int): optional timeout | |
Returns: | |
value | |
""" | |
return self.info(uuid, msecs)["result"] | |
@QtCore.Slot(str, "QVariant") | |
def _worker_finished(self, uuid, result): | |
# Called when a worker finishes | |
self._results[uuid] = { | |
"result": result, | |
"success": True, | |
} | |
self.worker_finished.emit(uuid, True) | |
if len(self._results) == self._count: | |
success = all(v["success"] for v in self._results.values()) | |
self.finished.emit(success) | |
@QtCore.Slot(str, str, str) | |
def _worker_failed(self, uuid, error, traceback): | |
# Called when a worker errors | |
self._results[uuid] = { | |
"result": None, | |
"success": False, | |
"error": error, | |
"traceback": traceback, | |
} | |
self.worker_finished.emit(uuid, False) | |
if len(self._results) == self._count: | |
self.finished.emit(False) | |
# An example usage demo | |
import time | |
class Dialog(QtWidgets.QDialog): | |
def __init__(self, title, message, parent=None): | |
super(Dialog, self).__init__(parent) | |
self.setWindowTitle(title) | |
layout = QtWidgets.QVBoxLayout(self) | |
layout.addWidget(QtWidgets.QLabel(message)) | |
buttons = QtWidgets.QHBoxLayout() | |
layout.addLayout(buttons) | |
yes = QtWidgets.QPushButton("Yes") | |
yes.clicked.connect(self.accept) | |
buttons.addWidget(yes) | |
no = QtWidgets.QPushButton("No") | |
no.clicked.connect(self.reject) | |
buttons.addWidget(no) | |
class DemoWidget(QtWidgets.QWidget): | |
def __init__(self, parent=None): | |
super(DemoWidget, self).__init__(parent) | |
self._time = time.time() | |
layout = QtWidgets.QVBoxLayout(self) | |
self._label = QtWidgets.QLabel("0") | |
layout.addWidget(self._label) | |
output_label = QtWidgets.QLabel("Output") | |
layout.addWidget(output_label) | |
self._output = QtWidgets.QPlainTextEdit() | |
layout.addWidget(self._output) | |
timer = QtCore.QTimer(self) | |
timer.timeout.connect(self._update) | |
timer.setInterval(200) | |
timer.start() | |
buttons = QtWidgets.QHBoxLayout() | |
layout.addLayout(buttons) | |
frozen_button = QtWidgets.QPushButton("Run process Frozen") | |
unfrozen_button = QtWidgets.QPushButton("Run process Async") | |
frozen_dialog_button = QtWidgets.QPushButton("Show Frozen Dialog") | |
unfrozen_dialog_button = QtWidgets.QPushButton("Show Async Dialog") | |
buttons.addWidget(frozen_button) | |
buttons.addWidget(unfrozen_button) | |
buttons.addWidget(frozen_dialog_button) | |
buttons.addWidget(unfrozen_dialog_button) | |
frozen_button.clicked.connect(self._run_sync) | |
unfrozen_button.clicked.connect(self._run_async) | |
frozen_dialog_button.clicked.connect(self._show_sync_dialog) | |
unfrozen_dialog_button.clicked.connect(self._show_async_dialog) | |
def _update(self): | |
self._label.setText("{:.2f}s".format(time.time()-self._time)) | |
@staticmethod | |
def a_longcommand(a, b): | |
time.sleep(1) | |
return a*b | |
def _run_sync(self): | |
self._output.clear() | |
self._output.appendPlainText("Started at {:.2f}s".format(time.time()-self._time)) | |
result = self.a_longcommand(1, 2) | |
self._output.appendPlainText("1*2 = {}".format(result)) | |
result = self.a_longcommand(2, 4) | |
self._output.appendPlainText("2*4 = {}".format(result)) | |
result = self.a_longcommand(6, 8) | |
self._output.appendPlainText("6*8 = {}".format(result)) | |
def _run_async(self): | |
self._output.clear() | |
self._output.appendPlainText("Started at {:.2f}s".format(time.time()-self._time)) | |
with ThreadManager(timeout=5000) as threaded: | |
uid_1 = threaded.run(self.a_longcommand, 1, 2) | |
uid_2 = threaded.run(self.a_longcommand, 2, 4) | |
uid_3 = threaded.run(self.a_longcommand, 6, 8) | |
result = threaded.result(uid_1) | |
self._output.appendPlainText("1*2 = {}".format(result)) | |
result = threaded.result(uid_2) | |
self._output.appendPlainText("2*4 = {}".format(result)) | |
result = threaded.result(uid_3) | |
self._output.appendPlainText("6*8 = {}".format(result)) | |
def _show_sync_dialog(self): | |
self._output.clear() | |
box = Dialog("I'd like to ask a question", "Do you like Qt?", self) | |
self._output.appendPlainText("Showing dialog box") | |
answer = box.exec_() | |
self._output.appendPlainText("The answer was: {}".format("Yes" if answer else "No")) | |
def _show_async_dialog(self): | |
self._output.clear() | |
box = Dialog("I'd like to ask a question", "Do you like Qt?", self) | |
self._output.appendPlainText("Showing dialog box") | |
box.show() | |
loop = QtCore.QEventLoop() | |
box.finished.connect(loop.quit) | |
loop.exec_() | |
answer = box.result() | |
self._output.appendPlainText("The answer was: {}".format("Yes" if answer else "No")) | |
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) | |
widget = DemoWidget() | |
widget.show() | |
app.exec_() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment