Created
February 2, 2023 16:01
-
-
Save benongithub/5ce06890f10c9187bfaa4ec30ad4ed72 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from threading import Thread | |
import serial | |
import time | |
import collections | |
import matplotlib.pyplot as plt | |
import matplotlib.animation as animation | |
import struct | |
import copy | |
import pandas as pd | |
from matplotlib.widgets import Slider | |
class serialPlot: | |
def __init__(self, serialPort='COM7', serialBaud=115200, plotLength=100, dataNumBytes=8, numPlots=3): | |
# servo_one_slider, servo_two_slider, | |
self.port = serialPort | |
self.baud = serialBaud | |
self.plotMaxLength = plotLength | |
self.dataNumBytes = dataNumBytes | |
self.numPlots = numPlots | |
self.rawData = bytearray(numPlots * dataNumBytes) | |
self.dataType = None | |
self.dataType = 'd' | |
# if dataNumBytes == 2: | |
# self.dataType = 'h' # 2 byte integer | |
# elif dataNumBytes == 4: | |
# self.dataType = 'f' # 4 byte float | |
# elif dataNumBytes == 8: | |
# self.dataType = 'd' # 8 byte double | |
self.data = [] | |
for i in range(numPlots): # give an array for each type of data and store them in a list | |
self.data.append(collections.deque([0] * plotLength, maxlen=plotLength)) | |
self.isRun = True | |
self.isReceiving = False | |
self.thread = None | |
self.plotTimer = 0 | |
self.previousTimer = 0 | |
# self.csvData = [] | |
# self.servo_one_slider = servo_one_slider | |
# self.servo_two_slider = servo_two_slider | |
print('Trying to connect to: ' + str(serialPort) + ' at ' + str(serialBaud) + ' BAUD.') | |
try: | |
self.serialConnection = serial.Serial(serialPort, serialBaud, timeout=4) | |
print('Connected to ' + str(serialPort) + ' at ' + str(serialBaud) + ' BAUD.') | |
except: | |
print("Failed to connect with " + str(serialPort) + ' at ' + str(serialBaud) + ' BAUD.') | |
def readSerialStart(self): | |
if self.thread == None: | |
self.thread = Thread(target=self.backgroundThread) | |
self.thread.start() | |
# Block till we start receiving values | |
while self.isReceiving != True: | |
time.sleep(0.1) | |
def getSerialData(self, frame, lines, lineValueText, lineLabel, timeText): | |
currentTimer = time.perf_counter() | |
self.plotTimer = int((currentTimer - self.previousTimer) * 1000) # the first reading will be erroneous | |
self.previousTimer = currentTimer | |
timeText.set_text('Plot Interval = ' + str(self.plotTimer) + 'ms') | |
privateData = copy.deepcopy(self.rawData[:]) # so that the 3 values in our plots will be synchronized to the same sample time | |
for i in range(self.numPlots): | |
data = privateData[(i*self.dataNumBytes):(self.dataNumBytes + i*self.dataNumBytes)] | |
value, = struct.unpack(self.dataType, data) | |
self.data[i].append(value) # we get the latest data point and append it to our array | |
lines[i].set_data(range(self.plotMaxLength), self.data[i]) | |
lineValueText[i].set_text('[' + lineLabel[i] + '] = ' + str(value)) | |
# self.csvData.append([self.data[0][-1], self.data[1][-1], self.data[2][-1]]) | |
def backgroundThread(self): # retrieve data | |
time.sleep(1.0) # give some buffer time for retrieving data | |
self.serialConnection.reset_input_buffer() | |
while (self.isRun): | |
self.serialConnection.readinto(self.rawData) | |
self.isReceiving = True | |
# print(self.rawData) | |
def update_servos(self, val): | |
#print(b'%d,%d\n' % (self.servo_one_slider.val, self.servo_two_slider.val)) | |
pitch_angle = self.servo_one_slider.val | |
yaw_angle = self.servo_two_slider.val | |
# x = 20 + pitch_angle * 140 | |
# y = 160 - pitch_angle * 140 | |
x = 60 + pitch_angle * 120 | |
y = 60 + pitch_angle * 120 | |
print(x, y) | |
self.serialConnection.write( | |
b'%d,%d\n' % (x, y) | |
) | |
def close(self): | |
self.isRun = False | |
self.thread.join() | |
self.serialConnection.close() | |
print('Disconnected...') | |
# df = pd.DataFrame(self.csvData) | |
# df.to_csv('/home/rikisenia/Desktop/data.csv') | |
def main(): | |
# portName = 'COM5' | |
portName = 'COM7' | |
baudRate = 115200 | |
maxPlotLength = 100 # number of points in x-axis of real time plot | |
dataNumBytes = 8 # number of bytes of 1 data point | |
numPlots = 3 # number of plots in 1 graph | |
# plotting starts below | |
pltInterval = 50 # Period at which the plot animation updates [ms] | |
xmin = 0 | |
xmax = maxPlotLength | |
ymin = -(180) | |
ymax = 360 | |
fig = plt.figure(figsize=(10, 8)) | |
ax = plt.axes(xlim=(xmin, xmax), ylim=(float(ymin - (ymax - ymin) / 10), float(ymax + (ymax - ymin) / 10))) | |
ax.set_title('Angle') | |
ax.set_xlabel("Time") | |
ax.set_ylabel("Servo Angle Output") | |
lineLabel = ['X', 'Y', 'Z'] | |
style = ['r-', 'c-', 'b-'] # linestyles for the different plots | |
timeText = ax.text(0.70, 0.95, '', transform=ax.transAxes) | |
lines = [] | |
lineValueText = [] | |
for i in range(numPlots): | |
lines.append(ax.plot([], [], style[i], label=lineLabel[i])[0]) | |
lineValueText.append(ax.text(0.70, 0.90-i*0.05, '', transform=ax.transAxes)) | |
plt.legend(loc="upper left") | |
# fig.subplots_adjust(left=0.25, bottom=0.25) | |
# | |
# #servo_one_angle = 180 / 2 | |
# #servo_two_angle = 180 / 2 | |
# servo_one_angle = 0.5 | |
# servo_two_angle = 0.5 | |
# | |
# axservo_one = fig.add_axes([0.1, 0.25, 0.0225, 0.63]) | |
# servo_one_slider = Slider( | |
# ax=axservo_one, | |
# label="Pitch Angle", | |
# valmin=0, | |
# valmax=1, | |
# valinit=servo_one_angle, | |
# orientation="vertical" | |
# ) | |
# axservo_two = fig.add_axes([0, 0.25, 0.0225, 0.63]) | |
# servo_two_slider = Slider( | |
# ax=axservo_two, | |
# label="Yaw Angle", | |
# valmin=0, | |
# valmax=1, | |
# valinit=servo_two_angle, | |
# orientation="vertical" | |
# ) | |
# | |
# s = serialPlot(servo_one_slider, servo_two_slider, portName, baudRate, maxPlotLength, dataNumBytes, numPlots) # initializes all required variables | |
# | |
# servo_one_slider.on_changed(s.update_servos) | |
# servo_two_slider.on_changed(s.update_servos) | |
s = serialPlot(portName, baudRate, maxPlotLength, dataNumBytes, numPlots) # initializes all required variables | |
s.readSerialStart() | |
anim = animation.FuncAnimation(fig, s.getSerialData, fargs=(lines, lineValueText, lineLabel, timeText), interval=pltInterval) # fargs has to be a tuple | |
plt.show() | |
s.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment