Skip to content

Instantly share code, notes, and snippets.

@clungzta
Created July 18, 2020 06:02
Show Gist options
  • Save clungzta/e080273cd0c2d039d2d04ea64996a973 to your computer and use it in GitHub Desktop.
Save clungzta/e080273cd0c2d039d2d04ea64996a973 to your computer and use it in GitHub Desktop.
import re
import time
import serial
import threading
import numpy as np
from pprint import pprint
from functools import partial
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque, defaultdict
dict_of_deques = defaultdict(partial(deque, maxlen=100))
connected = False
serial_port = serial.Serial('/dev/ttyACM0', 115200, timeout=1)
items_to_plot = ['PV', 'SP', 'cur', 'RAW']
### Multithreaded Reading From Serial
def handle_serial_data(line):
'''
Expected format 'M1: SP=20265, PV=0, pterm=20265, RAW=162, out=126, cur=-0.0A\r\n'
'''
global d
if line.strip('\r\n'):
try:
stripped = line.strip('\r\n')
identifier, data = stripped.split(': ')
split_str = data.split(', ')
except Exception as e:
print(f'Failed to parse: "{stripped}"')
return
# print(e)
outdict = {}
for item in split_str:
try:
k, v = item.split('=')
except ValueError:
print(f'Failed to parse: "{stripped}"')
return
# extract floating number from the string v (https://stackoverflow.com/a/4703409)
extracted_float = float(re.findall("[-+]?\d*\.\d+|\d+", v)[0])
outdict[k] = extracted_float
print(k, extracted_float)
if set(items_to_plot).issubset(set(outdict.keys())):
dict_of_deques[identifier].append(outdict)
def read_from_port(ser):
global connected
while not connected:
#serin = ser.read()
connected = True
while True:
# print("test")
reading = ser.readline()
if reading:
try:
handle_serial_data(reading.decode("utf-8"))
except UnicodeDecodeError as e:
print(e)
thread = threading.Thread(target=read_from_port, args=(serial_port,))
thread.start()
### Plotting
def create_subplots(fig, items_to_plot):
number_of_subplots = len(items_to_plot)
number_of_columns = np.ceil(number_of_subplots / 2.0)
number_of_rows = (number_of_subplots // number_of_columns) + (number_of_subplots % number_of_columns)
# Create a Position index
position = range(1, number_of_subplots + 1)
axes_list = []
for k, item_name in enumerate(items_to_plot):
ax = fig.add_subplot(number_of_rows, number_of_columns, position[k])
axes_list.append((item_name, ax))
return axes_list
def animate(i, data, axes_list):
'''
Create animation with one colour per identifier and one plot per key
'''
items_to_plot, _ = zip(*axes_list)
print(items_to_plot)
for identifier, list_of_dicts in dict(dict_of_deques).items():
dict_of_lists = {k: [dic[k] for dic in list_of_dicts] for k in items_to_plot}
# pprint(dict_of_lists)
for item_name, ax in axes_list:
ys = dict_of_lists[item_name]
xs = np.arange(len(ys))
ax.clear()
ax.set_title(item_name)
ax.plot(xs, ys, label="Experimental Probability")
# ax.plot(xs, rs, label="Theoretical Probability")
print('animating', i)
fig = plt.figure(1)
axes_list = create_subplots(fig, items_to_plot)
# Set up plot to call animate() function periodically
ani = animation.FuncAnimation(fig, animate, fargs=(dict_of_deques, axes_list), interval=50)
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment