Created
July 18, 2020 06:02
-
-
Save clungzta/e080273cd0c2d039d2d04ea64996a973 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
import serial | |
import threading | |
import numpy as np | |
from pprint import pprint | |
from functools import partial | |
import matplotlib.pyplot as plt | |
import matplotlib.animation as animation | |
from collections import deque, defaultdict | |
dict_of_deques = defaultdict(partial(deque, maxlen=100)) | |
connected = False | |
serial_port = serial.Serial('/dev/ttyACM0', 115200, timeout=1) | |
items_to_plot = ['PV', 'SP', 'cur', 'RAW'] | |
### Multithreaded Reading From Serial | |
def handle_serial_data(line): | |
''' | |
Expected format 'M1: SP=20265, PV=0, pterm=20265, RAW=162, out=126, cur=-0.0A\r\n' | |
''' | |
global d | |
if line.strip('\r\n'): | |
try: | |
stripped = line.strip('\r\n') | |
identifier, data = stripped.split(': ') | |
split_str = data.split(', ') | |
except Exception as e: | |
print(f'Failed to parse: "{stripped}"') | |
return | |
# print(e) | |
outdict = {} | |
for item in split_str: | |
try: | |
k, v = item.split('=') | |
except ValueError: | |
print(f'Failed to parse: "{stripped}"') | |
return | |
# extract floating number from the string v (https://stackoverflow.com/a/4703409) | |
extracted_float = float(re.findall("[-+]?\d*\.\d+|\d+", v)[0]) | |
outdict[k] = extracted_float | |
print(k, extracted_float) | |
if set(items_to_plot).issubset(set(outdict.keys())): | |
dict_of_deques[identifier].append(outdict) | |
def read_from_port(ser): | |
global connected | |
while not connected: | |
#serin = ser.read() | |
connected = True | |
while True: | |
# print("test") | |
reading = ser.readline() | |
if reading: | |
try: | |
handle_serial_data(reading.decode("utf-8")) | |
except UnicodeDecodeError as e: | |
print(e) | |
thread = threading.Thread(target=read_from_port, args=(serial_port,)) | |
thread.start() | |
### Plotting | |
def create_subplots(fig, items_to_plot): | |
number_of_subplots = len(items_to_plot) | |
number_of_columns = np.ceil(number_of_subplots / 2.0) | |
number_of_rows = (number_of_subplots // number_of_columns) + (number_of_subplots % number_of_columns) | |
# Create a Position index | |
position = range(1, number_of_subplots + 1) | |
axes_list = [] | |
for k, item_name in enumerate(items_to_plot): | |
ax = fig.add_subplot(number_of_rows, number_of_columns, position[k]) | |
axes_list.append((item_name, ax)) | |
return axes_list | |
def animate(i, data, axes_list): | |
''' | |
Create animation with one colour per identifier and one plot per key | |
''' | |
items_to_plot, _ = zip(*axes_list) | |
print(items_to_plot) | |
for identifier, list_of_dicts in dict(dict_of_deques).items(): | |
dict_of_lists = {k: [dic[k] for dic in list_of_dicts] for k in items_to_plot} | |
# pprint(dict_of_lists) | |
for item_name, ax in axes_list: | |
ys = dict_of_lists[item_name] | |
xs = np.arange(len(ys)) | |
ax.clear() | |
ax.set_title(item_name) | |
ax.plot(xs, ys, label="Experimental Probability") | |
# ax.plot(xs, rs, label="Theoretical Probability") | |
print('animating', i) | |
fig = plt.figure(1) | |
axes_list = create_subplots(fig, items_to_plot) | |
# Set up plot to call animate() function periodically | |
ani = animation.FuncAnimation(fig, animate, fargs=(dict_of_deques, axes_list), interval=50) | |
plt.show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment