Created
April 18, 2023 13:09
-
-
Save mattwilliamson/d1f288828afac3ff28d36b526c766ffc to your computer and use it in GitHub Desktop.
camsense x1 lidar parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial | |
from struct import unpack | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import queue | |
import threading | |
from matplotlib.animation import FuncAnimation | |
import time | |
import sys | |
from scipy.ndimage import median_filter, gaussian_filter1d | |
HEAD1 = 0x55 | |
HEAD2 = 0xaa | |
HEAD3 = 0x03 | |
HEAD4 = 0x08 | |
ST_HEAD1 = 0 | |
ST_HEAD2 = 1 | |
ST_HEAD3 = 2 | |
ST_HEAD4 = 3 | |
ST_SPEED = 4 | |
ST_START = 5 | |
ST_DATA = 6 | |
ST_END = 7 | |
ST_CRC = 8 | |
state = ST_HEAD1 | |
distance = np.ones(360) * 0x8000 | |
quality = np.zeros(360) | |
angle_index = np.array([]) | |
last_start = 0 | |
start = 0 | |
buffer = b"" | |
que = queue.Queue(20) | |
def parse_data(ser: serial.Serial): | |
global state | |
global distance | |
global quality | |
global last_start | |
global que | |
global buffer | |
global start_angle | |
if state == ST_HEAD1: | |
head = ser.read() | |
if int.from_bytes(head, "little") == HEAD1: | |
state = ST_HEAD2 | |
elif state == ST_HEAD2: | |
head = ser.read() | |
if int.from_bytes(head, "little") == HEAD2: | |
state = ST_HEAD3 | |
else: | |
state = ST_HEAD1 | |
elif state == ST_HEAD3: | |
head = ser.read() | |
if int.from_bytes(head, "little") == HEAD3: | |
state = ST_HEAD4 | |
else: | |
state = ST_HEAD1 | |
elif state == ST_HEAD4: | |
head = ser.read() | |
if int.from_bytes(head, "little") == HEAD4: | |
state = ST_SPEED | |
else: | |
state = ST_HEAD1 | |
elif state == ST_SPEED: | |
speed_bytes = ser.read(2) | |
# speed = int.from_bytes(speed_bytes, "little") | |
# print("speed: {}".format(speed)) | |
state = ST_START | |
elif state == ST_START: | |
start_bytes = ser.read(2) | |
start = int.from_bytes(start_bytes, "little") | |
start_angle = start / 64 - 640 | |
if last_start != 0: | |
if start < last_start: | |
## preprocess | |
# distance = median_filter(distance, 5) | |
## FIXME: kind of calibration for data rotation | |
shift = 10 | |
distance = np.roll(distance, shift) | |
quality = np.roll(quality, shift) | |
## | |
temp = distance | |
print("invalid ratio: {}".format(temp[temp==0x8000].size/temp.size)) # show ratio of invalid data | |
que.put((distance, quality)) | |
if 10 < que.qsize(): | |
que.get(5) ## FIXME: drop frame for "real-time" display | |
que.put((distance, quality)) | |
distance = np.ones(360) * 0x8000 | |
quality = np.zeros(360) | |
state = ST_DATA | |
else: # ignore the data when start | |
ser.read(28) | |
state = ST_HEAD1 | |
last_start = start | |
elif state == ST_DATA: | |
buffer = ser.read(24) | |
state = ST_END | |
elif state == ST_END: | |
end_bytes = ser.read(2) | |
end = int.from_bytes(end_bytes, "little") | |
end_angle = end / 64 - 640 | |
if end_angle < start_angle: | |
end_angle = end_angle + 360 | |
res = (end_angle - start_angle) / 8 | |
if res == 0: | |
state = ST_HEAD1 | |
else: | |
# print("start: {}, end: {}, res: {}".format(start_angle, end_angle, res)) | |
angles = np.arange(start_angle, end_angle, res) | |
indices = (np.round(angles) % 360).astype(int) | |
# print("indices: {}".format(indices)) | |
for i, j in zip(range(0, 24, 3), indices): | |
d, q = unpack("<HB", buffer[i:i+3]) | |
if distance[j] != 0x8000: | |
distance[j] = (d + distance[j]) / 2 | |
else: | |
distance[j] = d | |
if quality[j] != 0: | |
quality[j] = (q + quality[j]) / 2 | |
else: | |
quality[j] = q | |
state = ST_CRC | |
elif state == ST_CRC: # FIXME: CRC16??? | |
crc = int.from_bytes(ser.read(2), "little") | |
# print("crc: {}".format(hex(crc))) | |
state = ST_HEAD1 | |
distance_line = None | |
quality_line = None | |
def update_plot(data): | |
global distance_line | |
global quality_line | |
distance_line.set_ydata(data[0][::-1]) | |
quality_line.set_ydata(data[1][::-1]) | |
return distance_line, quality_line | |
def gen_lidar_data(): | |
global que | |
yield que.get() | |
def lidar_animation(): | |
global distance_line | |
global quality_line | |
fig = plt.figure() | |
ax = fig.add_subplot(111, projection="polar") | |
radian = np.arange(0, np.pi*2, np.deg2rad(1)) | |
ax.set_rlim(0, 8000) | |
distance_line = ax.plot(radian, np.zeros_like(radian), "r.")[0] | |
quality_line = ax.plot(radian, np.zeros_like(radian), "g.")[0] | |
ani = FuncAnimation(fig, update_plot, gen_lidar_data) | |
plt.show() | |
def main(): | |
ser = serial.Serial("/dev/ttyUSB0", 115200, timeout=1) | |
ser.flush() | |
t = threading.Thread(target=lidar_animation) | |
t.start() | |
try: | |
while True: | |
parse_data(ser) | |
except KeyboardInterrupt: | |
t.join() | |
ser.close() | |
print("Exit camsense") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment