import re
import time
import serial
import threading
import numpy as np
from pprint import pprint
from functools import partial
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque, defaultdict
dict_of_deques = defaultdict(partial(deque, maxlen=100))
connected = False
serial_port = serial.Serial('/dev/ttyACM0', 115200, timeout=1)
items_to_plot = ['PV', 'SP', 'cur', 'RAW']
### Multithreaded Reading From Serial
def handle_serial_data(line):
Expected format 'M1: SP=20265, PV=0, pterm=20265, RAW=162, out=126, cur=-0.0A\r\n'
global d
if line.strip('\r\n'):
stripped = line.strip('\r\n')
identifier, data = stripped.split(': ')
split_str = data.split(', ')
except Exception as e:
print(f'Failed to parse: "{stripped}"')
# print(e)
outdict = {}
for item in split_str:
k, v = item.split('=')
except ValueError:
print(f'Failed to parse: "{stripped}"')
# extract floating number from the string v (
extracted_float = float(re.findall("[-+]?\d*\.\d+|\d+", v)[0])
outdict[k] = extracted_float
print(k, extracted_float)
if set(items_to_plot).issubset(set(outdict.keys())):
def read_from_port(ser):
global connected
while not connected:
#serin =
connected = True
while True:
# print("test")
reading = ser.readline()
if reading:
except UnicodeDecodeError as e:
thread = threading.Thread(target=read_from_port, args=(serial_port,))
### Plotting
def create_subplots(fig, items_to_plot):
number_of_subplots = len(items_to_plot)
number_of_columns = np.ceil(number_of_subplots / 2.0)
number_of_rows = (number_of_subplots // number_of_columns) + (number_of_subplots % number_of_columns)
# Create a Position index
position = range(1, number_of_subplots + 1)
axes_list = []
for k, item_name in enumerate(items_to_plot):
ax = fig.add_subplot(number_of_rows, number_of_columns, position[k])
axes_list.append((item_name, ax))
return axes_list
def animate(i, data, axes_list):
Create animation with one colour per identifier and one plot per key
items_to_plot, _ = zip(*axes_list)
for identifier, list_of_dicts in dict(dict_of_deques).items():
dict_of_lists = {k: [dic[k] for dic in list_of_dicts] for k in items_to_plot}
# pprint(dict_of_lists)
for item_name, ax in axes_list:
ys = dict_of_lists[item_name]
xs = np.arange(len(ys))
ax.plot(xs, ys, label="Experimental Probability")
# ax.plot(xs, rs, label="Theoretical Probability")
print('animating', i)
fig = plt.figure(1)
axes_list = create_subplots(fig, items_to_plot)
# Set up plot to call animate() function periodically
ani = animation.FuncAnimation(fig, animate, fargs=(dict_of_deques, axes_list), interval=50)
