Skip to content

Instantly share code, notes, and snippets.

@JokerMartini
Forked from minimalefforttech/async_callable.py
Created March 3, 2024 13:08
Show Gist options
  • Save JokerMartini/a1dd384865ffd7ac8ce5e1fcee3bc7a4 to your computer and use it in GitHub Desktop.
Save JokerMartini/a1dd384865ffd7ac8ce5e1fcee3bc7a4 to your computer and use it in GitHub Desktop.
A simple example of running code on separate threads
# Copyright (C) 2024 Alex Telford
# http://minimaleffort.tech
# This work is licensed under the Creative Commons Attribution 4.0 International License.
# To view a copy of this license, visit http://creativecommons.org/licenses/by/4.0/ or send a letter to Creative Commons,
# PO Box 1866, Mountain View, CA 94042, USA.
# Distributed without any warranty or liability, use at your own risk
# This is an example of deffering code using Qt event loops
try:
from PySide6 import QtCore, QtWidgets
except ImportError:
from PySide2 import QtCore, QtWidgets
import traceback
class _Worker(QtCore.QRunnable, QtCore.QObject):
""" This is a simple runnable, Qt will move it to the appropriate thread in the pool
Args:
callable: Tuple(Function, Args[], Kwargs{})
"""
finished = QtCore.Signal(str, "QVariant", arguments=["uuid", "result"])
failed = QtCore.Signal(str, str, str, arguments=["uuid", "error", "traceback"])
def __init__(self, callable):
QtCore.QRunnable.__init__(self)
QtCore.QObject.__init__(self)
self._callable = callable
self.setAutoDelete(True)
# sender() will not work across thread boundary so we must track by uuid
self._uid = QtCore.QUuid.createUuid()
@QtCore.Slot(result=str)
def uid(self):
# A unique identifier for this worker
return self._uid.toString()
def run(self):
""" Runs the code and handles any exceptions across the thread boundary
"""
try:
function, args, kwargs = self._callable
result = function(*args, **kwargs)
self.finished.emit(self.uid(), result)
except Exception as e:
# exceptions will not make it over thread boundary
traceback.print_exc()
self.failed.emit(self.uid(), str(e), traceback.format_exc())
class ThreadManager(QtCore.QObject):
""" A simple wrapper to dump functions to a threadpool for later retrieval
Args:
timeout[int]: time out after msecs
Usage:
with ThreadManager as threads:
uid = threads.run(function, arg1, arg2)
info = threads.info(uid)
if info["success]:
value = info["result"]
else:
print(info["traceback"])
"""
finished = QtCore.Signal(bool, arguments=["success"])
worker_finished = QtCore.Signal(str, bool, arguments=["uuid", "success"])
def __init__(self, timeout=-1):
super(ThreadManager, self).__init__()
self._pool = QtCore.QThreadPool(self)
self._timeout = timeout
self._count = 0
self._results = {}
def run(self, callable, *args, **kwargs):
"""Add a callable to the stack and return a reference uid
Args:
callable (Callable): function to call
*args: args to pass to callable
**kwargs: kwargs to pass to callable
Returns:
str: uuid
"""
worker = _Worker([callable, args, kwargs])
worker.finished.connect(self._worker_finished)
worker.failed.connect(self._worker_failed)
started = self._pool.tryStart(worker)
self._count += 1
if not started:
worker.run() # Unlikely event the pool could not reserve a thread
return worker.uid()
def __enter__(self):
return self
def __exit__(self, tb_type, tb_traceback, tb_value):
if not self._count:
return
if not self._pool.waitForDone(self._timeout):
raise RuntimeError("Timed Out")
success = all(v["success"] for v in self._results.values())
if not success:
raise RuntimeError("An exception occured")
@QtCore.Slot(str, result=bool)
@QtCore.Slot(str, int, result=bool)
def wait(self, uuid, msecs=-1):
""" Wait for the uid to finish
Args:
uuid(str)
msecs(int)
"""
if uuid in self._results:
return True
timer = None
if msecs > 0:
timer = QtCore.QTimer()
timer.setSingleShot(True)
timer.timeout.connect(loop.quit)
timer.start(msecs)
loop = QtCore.QEventLoop()
self.worker_finished.connect(loop.quit)
while True:
if timer and not timer.isActive():
return False
if uuid in self._results:
return True
if hasattr(loop, "exec"):
loop.exec()
else:
loop.exec_()
@QtCore.Slot(str, result="QVariant")
@QtCore.Slot(str, int, result="QVariant")
def info(self, uuid, msecs=-1):
"""Get the full result info for the uuid
Args:
uuid (str): uuid to query
msecs (int): optional timeout
Raises:
RuntimeError: if timedout
Returns:
dict{
"success": completed without errors
"result": return value
"message": exception message
"traceback": formated traceback
}
"""
if not self.wait(uuid, msecs):
raise RuntimeError("Timed out")
return self._results[uuid]
@QtCore.Slot(str, result="QVariant")
@QtCore.Slot(str, int, result="QVariant")
def result(self, uuid, msecs=-1):
"""Get the return value of the callable by its uid
Args:
uuid (str): uuid to query
msecs (int): optional timeout
Returns:
value
"""
return self.info(uuid, msecs)["result"]
@QtCore.Slot(str, "QVariant")
def _worker_finished(self, uuid, result):
# Called when a worker finishes
self._results[uuid] = {
"result": result,
"success": True,
}
self.worker_finished.emit(uuid, True)
if len(self._results) == self._count:
success = all(v["success"] for v in self._results.values())
self.finished.emit(success)
@QtCore.Slot(str, str, str)
def _worker_failed(self, uuid, error, traceback):
# Called when a worker errors
self._results[uuid] = {
"result": None,
"success": False,
"error": error,
"traceback": traceback,
}
self.worker_finished.emit(uuid, False)
if len(self._results) == self._count:
self.finished.emit(False)
# An example usage demo
import time
class Dialog(QtWidgets.QDialog):
def __init__(self, title, message, parent=None):
super(Dialog, self).__init__(parent)
self.setWindowTitle(title)
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(QtWidgets.QLabel(message))
buttons = QtWidgets.QHBoxLayout()
layout.addLayout(buttons)
yes = QtWidgets.QPushButton("Yes")
yes.clicked.connect(self.accept)
buttons.addWidget(yes)
no = QtWidgets.QPushButton("No")
no.clicked.connect(self.reject)
buttons.addWidget(no)
class DemoWidget(QtWidgets.QWidget):
def __init__(self, parent=None):
super(DemoWidget, self).__init__(parent)
self._time = time.time()
layout = QtWidgets.QVBoxLayout(self)
self._label = QtWidgets.QLabel("0")
layout.addWidget(self._label)
output_label = QtWidgets.QLabel("Output")
layout.addWidget(output_label)
self._output = QtWidgets.QPlainTextEdit()
layout.addWidget(self._output)
timer = QtCore.QTimer(self)
timer.timeout.connect(self._update)
timer.setInterval(200)
timer.start()
buttons = QtWidgets.QHBoxLayout()
layout.addLayout(buttons)
frozen_button = QtWidgets.QPushButton("Run process Frozen")
unfrozen_button = QtWidgets.QPushButton("Run process Async")
frozen_dialog_button = QtWidgets.QPushButton("Show Frozen Dialog")
unfrozen_dialog_button = QtWidgets.QPushButton("Show Async Dialog")
buttons.addWidget(frozen_button)
buttons.addWidget(unfrozen_button)
buttons.addWidget(frozen_dialog_button)
buttons.addWidget(unfrozen_dialog_button)
frozen_button.clicked.connect(self._run_sync)
unfrozen_button.clicked.connect(self._run_async)
frozen_dialog_button.clicked.connect(self._show_sync_dialog)
unfrozen_dialog_button.clicked.connect(self._show_async_dialog)
def _update(self):
self._label.setText("{:.2f}s".format(time.time()-self._time))
@staticmethod
def a_longcommand(a, b):
time.sleep(1)
return a*b
def _run_sync(self):
self._output.clear()
self._output.appendPlainText("Started at {:.2f}s".format(time.time()-self._time))
result = self.a_longcommand(1, 2)
self._output.appendPlainText("1*2 = {}".format(result))
result = self.a_longcommand(2, 4)
self._output.appendPlainText("2*4 = {}".format(result))
result = self.a_longcommand(6, 8)
self._output.appendPlainText("6*8 = {}".format(result))
def _run_async(self):
self._output.clear()
self._output.appendPlainText("Started at {:.2f}s".format(time.time()-self._time))
with ThreadManager(timeout=5000) as threaded:
uid_1 = threaded.run(self.a_longcommand, 1, 2)
uid_2 = threaded.run(self.a_longcommand, 2, 4)
uid_3 = threaded.run(self.a_longcommand, 6, 8)
result = threaded.result(uid_1)
self._output.appendPlainText("1*2 = {}".format(result))
result = threaded.result(uid_2)
self._output.appendPlainText("2*4 = {}".format(result))
result = threaded.result(uid_3)
self._output.appendPlainText("6*8 = {}".format(result))
def _show_sync_dialog(self):
self._output.clear()
box = Dialog("I'd like to ask a question", "Do you like Qt?", self)
self._output.appendPlainText("Showing dialog box")
answer = box.exec_()
self._output.appendPlainText("The answer was: {}".format("Yes" if answer else "No"))
def _show_async_dialog(self):
self._output.clear()
box = Dialog("I'd like to ask a question", "Do you like Qt?", self)
self._output.appendPlainText("Showing dialog box")
box.show()
loop = QtCore.QEventLoop()
box.finished.connect(loop.quit)
loop.exec_()
answer = box.result()
self._output.appendPlainText("The answer was: {}".format("Yes" if answer else "No"))
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([])
widget = DemoWidget()
widget.show()
app.exec_()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment