Skip to content

Instantly share code, notes, and snippets.

@ryansturmer
Last active October 25, 2024 13:56
Show Gist options
  • Save ryansturmer/604caf383ef853fdf1809b330dd9e69e to your computer and use it in GitHub Desktop.
Save ryansturmer/604caf383ef853fdf1809b330dd9e69e to your computer and use it in GitHub Desktop.
Python Review - Sensor Code?
import threading
import struct
import serial
import time
class OmegaSensor(object):
def __init__(self, comname):
self.comname = comname
self._stop = False
self.worker = None
self.readings = []
self.snr = None
self.connected = False
def _read_worker(self, rate):
try:
def read_packet(port):
# Read bytes until a sync byte (0xAA) is encountered
while True:
byte = port.read()
if byte == b'\xAA':
break
# Now we're at the start of a packet, read the packet type
packet_type = port.read()
# Read and parse the float
# Reading until 4 bytes or until we get a non-stuff byte after a 0xAA
float_bytes = bytearray()
while len(float_bytes) < 4:
byte = port.read()
float_bytes.append(byte[0])
# If this byte is a 0xAA, read the stuff byte
if byte == b'\xAA':
stuff_byte = port.read()
# If this is a true stuff byte, don't include it in the float bytes
if stuff_byte == b'\xAA':
continue
# If it's not a stuff byte, it's the start of the next packet, so break the loop
else:
break
# Convert the bytes to a float using little-endian IEEE 754 format
transducer_value = struct.unpack('<f', float_bytes)[0]
return packet_type, transducer_value
with serial.Serial(self.comname, 115200, timeout=3.0) as port:
port.flush()
port.flushInput()
port.write(b'PS\r')
port.write(b'SNR\r')
self.snr = int(port.readline().split(b'=')[1].strip())
#print("Starting capture with Omega transducer SN: %s" % self.snr)
rates = {
5 : 0,
10 : 1,
20 : 2,
40 : 3,
80 : 4,
160 : 5,
320 : 6,
640 : 7,
1000 : 8
}
try:
rateid = rates[rate]
except KeyError:
raise Exception('Invalid sample rate. Sample rate must be one of %s' % tuple(rates.keys()))
port.write(b'RATE %d\r' % rateid)
port.readline()
line = port.readline()
try:
rate = int(line.split(b'=')[1].strip())
if rate != rateid:
raise
except:
print(line)
raise Exception('Could not set rate!')
# Kick off continuous reads
port.write(b'PC\r')
while not self._stop:
try:
t = time.monotonic()
type, pressure = read_packet(port)
self.readings.append((t, pressure))
except Exception as e:
print(e)
port.write(b'PS\r')
finally:
self.connected = False
def stream(self, rate=80):
self.start(rate)
def start(self, rate=80):
self._stop = False
self.readings = []
self.worker = threading.Thread(target=self._read_worker, args=(rate,))
self.worker.start()
def stop(self):
self._stop = True
self.worker.join()
return self.readings
def get_reading(self):
return self.readings[-1]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment