Last active
October 25, 2024 13:56
-
-
Save ryansturmer/604caf383ef853fdf1809b330dd9e69e to your computer and use it in GitHub Desktop.
Python Review - Sensor Code?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import struct | |
import serial | |
import time | |
class OmegaSensor(object): | |
def __init__(self, comname): | |
self.comname = comname | |
self._stop = False | |
self.worker = None | |
self.readings = [] | |
self.snr = None | |
self.connected = False | |
def _read_worker(self, rate): | |
try: | |
def read_packet(port): | |
# Read bytes until a sync byte (0xAA) is encountered | |
while True: | |
byte = port.read() | |
if byte == b'\xAA': | |
break | |
# Now we're at the start of a packet, read the packet type | |
packet_type = port.read() | |
# Read and parse the float | |
# Reading until 4 bytes or until we get a non-stuff byte after a 0xAA | |
float_bytes = bytearray() | |
while len(float_bytes) < 4: | |
byte = port.read() | |
float_bytes.append(byte[0]) | |
# If this byte is a 0xAA, read the stuff byte | |
if byte == b'\xAA': | |
stuff_byte = port.read() | |
# If this is a true stuff byte, don't include it in the float bytes | |
if stuff_byte == b'\xAA': | |
continue | |
# If it's not a stuff byte, it's the start of the next packet, so break the loop | |
else: | |
break | |
# Convert the bytes to a float using little-endian IEEE 754 format | |
transducer_value = struct.unpack('<f', float_bytes)[0] | |
return packet_type, transducer_value | |
with serial.Serial(self.comname, 115200, timeout=3.0) as port: | |
port.flush() | |
port.flushInput() | |
port.write(b'PS\r') | |
port.write(b'SNR\r') | |
self.snr = int(port.readline().split(b'=')[1].strip()) | |
#print("Starting capture with Omega transducer SN: %s" % self.snr) | |
rates = { | |
5 : 0, | |
10 : 1, | |
20 : 2, | |
40 : 3, | |
80 : 4, | |
160 : 5, | |
320 : 6, | |
640 : 7, | |
1000 : 8 | |
} | |
try: | |
rateid = rates[rate] | |
except KeyError: | |
raise Exception('Invalid sample rate. Sample rate must be one of %s' % tuple(rates.keys())) | |
port.write(b'RATE %d\r' % rateid) | |
port.readline() | |
line = port.readline() | |
try: | |
rate = int(line.split(b'=')[1].strip()) | |
if rate != rateid: | |
raise | |
except: | |
print(line) | |
raise Exception('Could not set rate!') | |
# Kick off continuous reads | |
port.write(b'PC\r') | |
while not self._stop: | |
try: | |
t = time.monotonic() | |
type, pressure = read_packet(port) | |
self.readings.append((t, pressure)) | |
except Exception as e: | |
print(e) | |
port.write(b'PS\r') | |
finally: | |
self.connected = False | |
def stream(self, rate=80): | |
self.start(rate) | |
def start(self, rate=80): | |
self._stop = False | |
self.readings = [] | |
self.worker = threading.Thread(target=self._read_worker, args=(rate,)) | |
self.worker.start() | |
def stop(self): | |
self._stop = True | |
self.worker.join() | |
return self.readings | |
def get_reading(self): | |
return self.readings[-1] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment