Created
February 5, 2020 20:23
-
-
Save standarddeviant/ab17996bf8daa257703a48f6f49b6296 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import serial | |
import queue | |
import time | |
# make thread-safe queue for data logging/writing | |
global_dataMsgQ = queue.Queue() | |
# make a thread-safe queue to inform different threads when it's time to stop | |
global_stopQ = queue.Queue() | |
def serial_read_function(serial_port_obj, dataMsgQ, stopQ): | |
while True: | |
# check if we should stop | |
try: | |
stopQ.get_nowait() | |
serial_port_obj.close() | |
break | |
except queue.Empty: | |
pass | |
# read serial port | |
aline = serial_port_obj.readline() | |
# if there's no data before the serial object timeout, | |
# then it's reasonable for aline to sometimes be empty | |
if len(aline) > 0: | |
dataMsgQ.put((serial_port_obj.name, aline)) | |
# end serial_read_function | |
def spawn_serial_thread(port_name='/dev/ttyUSB0', baud_rate=151200, | |
timeout=0.1): | |
# make serial port | |
serial_port_obj = serial.Serial(port_name, baud_rate, timeout=timeout) | |
thread_obj = threading.Thread( | |
target=serial_read_function, | |
args=(serial_port_obj, global_dataMsgQ, global_stopQ) | |
) | |
thread_obj.start() | |
return (serial_port_obj, thread_obj) | |
# end spawn_serial_thread | |
def data_writer_function(fpath, dataMsgQ, stopQ): | |
with open(fpath, "w") as f: | |
while True: | |
# check if we should stop | |
try: | |
stopQ.get_nowait() | |
break | |
except queue.Empty: | |
pass | |
# check for messages to write | |
try: | |
ser_name, data = dataMsgQ.get_nowait() | |
# you'll want to fix this, I have no idea what the data format is or should be | |
f.write(data) | |
except queue.Empty: | |
pass | |
# end while(True) | |
# end with open(fpath) | |
# end data_writer_function | |
def spawn_data_writer_thread(fpath): | |
thread_obj = threading.Thread( | |
target=data_writer_function, | |
args=(fpath, global_dataMsgQ, global_stopQ) | |
) | |
thread_obj.start() | |
return thread_obj | |
# end spawn_data_writer_thread | |
def main(): | |
# make two serial ports and threads | |
s1, t1 = spawn_serial_thread('COM1', 115200, 0.1) | |
s2, t2 = spawn_serial_thread('COM2', 115200, 0.1) | |
# make data writing thread | |
data_writer_thread = spawn_data_writer_thread('log_fpath_goes_here.csv') | |
# wait 10 seconds | |
time.sleep(10.0) | |
# close up shop | |
# put a bunch of things on the queue to make sure all threads stop | |
for ix in range(10): | |
global_stopQ.put(None) | |
# wait for all threads to cleanly exit | |
thread_list = [t1, t2, data_writer_thread] | |
while(any([t.isAlive() for t in thread_list])): | |
time.sleep(0.05) | |
# end main |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment