Skip to content

Instantly share code, notes, and snippets.

@spewil
Created February 27, 2020 13:09
Show Gist options
  • Save spewil/f1ee61b1eccc851109a650347a493099 to your computer and use it in GitHub Desktop.
Save spewil/f1ee61b1eccc851109a650347a493099 to your computer and use it in GitHub Desktop.
Pyqtgraph multiprocessing coordination example
import queue
from pyqtgraph.Qt import QtGui, QtCore
import numpy as np
import pyqtgraph as pg
import multiprocessing
import time
# This function is responsible for displaying the data
# it is run in its own process to liberate main process
def display(name, q, e, q_empty):
app = QtGui.QApplication([])
win2 = pg.GraphicsWindow(title="Basic plotting examples")
win2.resize(1000, 600)
win2.setWindowTitle('pyqtgraph example: Plotting')
p2 = win2.addPlot(title="Updating plot")
curve = p2.plot(pen='y')
x_np = []
y_np = []
def updateInProc(curve, q, x, y):
# check stop event
if e.is_set():
item = q.get()
x.append(item[0])
y.append(item[1])
curve.setData(x, y)
else:
i = 0
print("[DISPLAY] Emptying queue.")
while True:
try:
q.get(block=False)
i += 1
except queue.Empty:
print("[DISPLAY] Items that were left in queue: ", i)
q_empty.set()
break
print("[DISPLAY] App quitting.")
QtGui.QApplication.closeAllWindows()
timer = QtCore.QTimer()
timer.timeout.connect(lambda: updateInProc(curve, q, x_np, y_np))
timer.start()
app.exec_()
print("[DISPLAY] Exec returned.")
# This is function is responsible for reading some data (IO, serial port, etc)
# and forwarding it to the display
# it is run in a thread
def producer(running, q, q_empty):
t = 0
while True:
if running.is_set():
s = np.sin(2 * np.pi * t)
t += 0.01
q.put([t, s])
time.sleep(0.0001)
else:
print(
"[PRODUCER] Done putting on queue. Waiting for q_empty event.")
q_empty.wait()
print("[PRODUCER] Queue was emptied.")
break
if __name__ == '__main__':
ctx = multiprocessing.get_context("spawn")
q = ctx.Queue()
# Event for stopping the IO thread
run = ctx.Event()
run.set()
q_empty = ctx.Event()
# Run io function in a thread
t = ctx.Process(target=producer, args=(run, q, q_empty))
# Start display process
p = ctx.Process(target=display, args=('bob', q, run, q_empty))
t.daemon = False
p.daemon = False
t.start()
p.start()
input("Type any key to quit.\n")
run.clear()
print("Waiting for graph window process to join...")
p.join()
print("Waiting for scheduler thread to join...")
t.join()
print("Process joined successfully.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment