Last active
March 4, 2025 18:29
-
-
Save mouldybread/b1bf8f6bcd047192b21fa78e08a9e6c5 to your computer and use it in GitHub Desktop.
A python script to read the serial output from a Bosean FS-5000 Radiation Detector and send that information to an MQTT server. Based on the fs5000.py script shared by Tim Brooks / brookst.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Original script by Tim Brooks | |
# https://gist.github.com/brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce | |
# Disclaimer: I can't write python! I wanted to do this but couldn't find another way, so I asked an AI to modify the original script. | |
# It should run silently unless there is an error, and MQTT messages are only sent when a value changes. | |
# However I am not in a position to gauge the quality of the script. | |
import datetime | |
from enum import Enum, Flag | |
import logging | |
import serial | |
import serial.tools.list_ports as list_ports | |
import struct | |
import paho.mqtt.client as mqtt | |
logging.basicConfig(level=logging.ERROR, format="%(levelname)s:%(name)s:%(funcName)s: %(message)s") | |
COMMAND = { | |
"set_time": b"\x01", | |
"read_dose_curve": b"\x03", | |
"set_rate_limit": b"\x04", | |
"set_dose_limit": b"\x05", | |
"get_version": b"\x06", | |
"get_dose": b"\x07", | |
"set_alert": b"\x08", | |
"set_display": b"\x09", | |
"set_mode": b"\x0a", | |
"set_language": b"\x0b", | |
"timer_duration": b"\x0c", | |
"clear_dose": b"\x0d", | |
"read": b"\x0e", | |
"read_rate_curve": b"\x0f", | |
"read_alarms": b"\x10", | |
} | |
RESPONSE = { | |
"readback": b"\x04", | |
"success": b"\x06", | |
"read_starting": b"\x0e\x06\x01", | |
"read_stopping": b"\x0e\x06\x00", | |
} | |
VID_PID = (0x1A86, 0x7523) | |
# MQTT settings | |
MQTT_BROKER = "your_mqtt_broker_address" | |
MQTT_PORT = 1883 | |
MQTT_USERNAME = "your_username" | |
MQTT_PASSWORD = "your_password" | |
MQTT_TOPIC_PREFIX = "your/mqtt/topic/prefix" | |
# MQTT client | |
client = mqtt.Client() | |
# Callback when the client connects to the broker | |
def on_connect(client, userdata, flags, rc): | |
if rc != 0: | |
logging.error(f"Failed to connect to MQTT Broker, return code {rc}") | |
# Callback when a message is published | |
def on_publish(client, userdata, mid): | |
logging.debug(f"Message {mid} published") | |
client.on_connect = on_connect | |
client.on_publish = on_publish | |
# Set the username and password for the MQTT client | |
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) | |
# Connect to the MQTT broker | |
client.connect(MQTT_BROKER, MQTT_PORT, 60) | |
# Start the MQTT loop in a separate thread | |
client.loop_start() | |
class FS5000: | |
def __init__(self, port): | |
self.port = serial.Serial(port, 115200, timeout=2) | |
self.log = logging.getLogger("FS5000") | |
self.previous_data = {} | |
def write(self, data): | |
return self.port.write(data) | |
def read(self, length): | |
return self.port.read(length) | |
def checksum(self, data: bytes) -> bytes: | |
return bytes([sum(data) % 256]) | |
def packet(self, payload: bytes): | |
data = b"\xaa" + bytes([len(payload) + 3]) + payload + self.checksum(data) + b"\x55" | |
return data | |
def send(self, command: bytes): | |
data = self.packet(command) | |
self.write(data) | |
def recv(self): | |
header = self.read(2) | |
if len(header) == 0: | |
return None | |
if header[0] != 0xAA: | |
raise IOError(f"Read header 0x{header[0]:02x} not 0xaa") | |
length = header[1] | |
data = self.read(length - 1) | |
if data[-1] != 0x55: | |
raise IOError(f"Read trailer 0x{data[-1]:02x} not 0x55") | |
checksum = self.checksum(header + data[:-2])[0] | |
if checksum != data[-2]: | |
raise IOError(f"Checksum failure {checksum:02x} != {data[-2]:02x}") | |
return data[:-2] | |
def check_success(self, command): | |
response = self.recv() | |
if response is None: | |
raise RuntimeError("No response received") | |
expectation = bytes([command[0]]) + RESPONSE["success"] | |
if response[:2] != expectation: | |
raise RuntimeError(f"Received {response=}, expected {expectation}") | |
return response[2:] | |
def set_time(self, time: datetime.datetime = None): | |
if time is None: | |
time = datetime.datetime.now() | |
command = COMMAND["set_time"] + bytes([time.year % 100, time.month, time.day, time.hour, time.minute, time.second]) | |
self.send(command) | |
self.check_success(command) | |
def read_dose_log(self): | |
self.send(COMMAND["read_dose_curve"]) | |
response = self.check_success(COMMAND["read_dose_curve"]) | |
packets, records = struct.unpack("!BH", response) | |
log = b"" | |
for packet in range(1, packets + 1): | |
response = self.recv() | |
if response[0] != COMMAND["read_dose_curve"][0] or response[1] != packet: | |
raise RuntimeError(f"Received unexpected packet {response=}") | |
log += response[2:] | |
raise NotImplementedError("TODO: Parse dose curve") | |
DIGITS = ".0123456789" | |
MSV_H = b"mSvh" | |
USV_H = b"uSvh" | |
RATE_UNIT = { | |
MSV_H: "mSv/h", | |
USV_H: "μSv/h", | |
} | |
def set_rate_limit(self, value: str, unit: bytes = b"uSvh"): | |
if not isinstance(value, str) or len(value) != 4 or any(c not in self.DIGITS for c in value): | |
raise ValueError("Rate limit must be 4 characters e.g. '2.50'") | |
if unit not in self.RATE_UNIT: | |
raise ValueError("Rate limit must have unit 'uSvh' or 'mSvh'") | |
command = COMMAND["set_rate_limit"] + value.encode("ascii") + unit | |
self.send(command) | |
self.check_success(command) | |
SV = b" Sv" | |
MSV = b"mSv" | |
USV = b"uSv" | |
DOSE_UNIT = { | |
SV: "Sv", | |
MSV: "mSv", | |
USV: "μSv", | |
} | |
def set_dose_limit(self, limit: str, unit: bytes = b"uSv"): | |
if not isinstance(limit, str) or len(limit) != 4 or any(c not in self.DIGITS for c in limit): | |
raise ValueError("Dose limit must be 4 characters e.g. '2.50'") | |
if unit not in self.DOSE_UNIT: | |
raise ValueError("Dose limit must have unit 'uSv', 'mSv' or ' Sv'") | |
command = COMMAND["set_dose_limit"] + limit.encode("ascii") + unit | |
self.send(command) | |
self.check_success(command) | |
def get_version(self): | |
self.send(COMMAND["get_version"]) | |
return self.recv() | |
def get_dose(self): | |
self.send(COMMAND["get_dose"]) | |
response = self.check_success(COMMAND["get_dose"]) | |
_, dose, *date = struct.unpack("!II5B", response) | |
dose *= 0.01 # Convert to μSv | |
year, month, day, hour, minute = date | |
year += 2000 | |
date = datetime.datetime(year, month, day, hour, minute) | |
return dose, date | |
class Notify(Flag): | |
LAMP = 0x01 | |
SOUND = 0x02 | |
VIBE = 0x04 | |
CLICK = 0x08 | |
def set_alert(self, value: Notify): | |
if not isinstance(value, self.Notify): | |
raise ValueError("Alert setting must be of type Notify") | |
command = COMMAND["set_alert"] + bytes([value.value]) | |
self.send(command) | |
self.check_success(command) | |
def set_display(self, brightness: int, timeout: int): | |
if not (0 <= brightness <= 5): | |
raise ValueError("Brightness must be in range [0-5]") | |
if not (0 <= timeout <= 9999): | |
raise ValueError("Timeout must be in range [0-9999]") | |
command = COMMAND["set_display"] + bytes([brightness, timeout // 256, timeout % 256]) | |
self.send(command) | |
self.check_success(command) | |
def set_mode(self, mode: bool): | |
command = COMMAND["set_mode"] + bytes([mode]) | |
self.send(command) | |
self.check_success(command) | |
class Language(Enum): | |
CHINESE = 0x00 | |
ENGLISH = 0x01 | |
def set_language(self, value: Language): | |
if not isinstance(value, self.Language): | |
raise ValueError("Language setting must be of type Language") | |
command = COMMAND["set_language"] + bytes([value.value]) | |
self.send(command) | |
self.check_success(command) | |
def get_duration(self): | |
command = COMMAND["timer_duration"] + b"\x01" | |
self.send(command) | |
response = self.recv() | |
expectation = COMMAND["timer_duration"] + RESPONSE["readback"] + b"\x00" | |
if response[:3] != expectation: | |
raise RuntimeError(f"Received {response=}, expected {expectation}") | |
seconds = struct.unpack("!I", response[3:])[0] | |
return seconds | |
def set_duration(self, seconds: int): | |
command = COMMAND["timer_duration"] + struct.pack("!BI", 0, seconds) | |
self.send(command) | |
self.check_success(command) | |
def clear_dose(self): | |
command = COMMAND["clear_dose"] | |
self.send(command) | |
response = self.check_success(command) | |
date = struct.unpack("!6B", response) | |
year, month, day, hour, minute = date | |
year += 2000 | |
date = datetime.datetime(year, month, day, hour, minute) | |
return date | |
def start_read(self): | |
self.send(COMMAND["read"] + b"\x01") | |
message = self.recv() | |
if message != RESPONSE["read_starting"]: | |
raise RuntimeError(f"Expected start of counts, got {message}") | |
def stop_read(self): | |
self.send(COMMAND["read"] + b"\x00") | |
while True: | |
try: | |
message = self.recv() | |
if message == RESPONSE["read_stopping"]: | |
break | |
except IOError: | |
pass | |
def yield_data(self): | |
self.start_read() | |
try: | |
while True: | |
message = self.recv() | |
if message is None: | |
continue | |
now = datetime.datetime.now().isoformat(timespec="seconds") | |
yield now + ";" + message[1:].decode() | |
finally: | |
self.stop_read() | |
def read_out(self): | |
try: | |
for datum in self.yield_data(): | |
data_parts = datum.split(";") | |
if len(data_parts) >= 9: | |
topics = [ | |
f"{MQTT_TOPIC_PREFIX}/datetime", | |
f"{MQTT_TOPIC_PREFIX}/dose_rate", | |
f"{MQTT_TOPIC_PREFIX}/total_dose", | |
f"{MQTT_TOPIC_PREFIX}/cps", | |
f"{MQTT_TOPIC_PREFIX}/cpm", | |
f"{MQTT_TOPIC_PREFIX}/avg_dose_rate", | |
f"{MQTT_TOPIC_PREFIX}/timer", | |
f"{MQTT_TOPIC_PREFIX}/timed_dose", | |
f"{MQTT_TOPIC_PREFIX}/alarm_status" | |
] | |
for topic, value in zip(topics, data_parts): | |
if topic not in self.previous_data or self.previous_data[topic] != value: | |
result = client.publish(topic, value) | |
if result.rc == 0: | |
self.previous_data[topic] = value | |
else: | |
logging.error(f"Failed to send message to topic `{topic}`") | |
except KeyboardInterrupt: | |
pass | |
def read_rate_log(self): | |
self.send(COMMAND["read_rate_curve"]) | |
response = self.check_success(COMMAND["read_rate_curve"]) | |
packets, _, records = struct.unpack("!HBH", response) | |
log = b"" | |
for packet in range(1, packets + 1): | |
response = self.recv() | |
command, packet_id = struct.unpack("!BH", response[:3]) | |
if command != COMMAND["read_rate_curve"][0] or packet_id != packet: | |
raise RuntimeError(f"Received unexpected packet {response=}") | |
log += response[3:] | |
raise NotImplementedError("TODO: Parse dose rate curve") | |
def read_alarms(self): | |
self.send(COMMAND["read_alarms"]) | |
response = self.check_success(COMMAND["read_alarms"]) | |
_, packets, _, records = struct.unpack("!BBBH", response) | |
log = b"" | |
for packet in range(1, packets + 1): | |
response = self.recv() | |
if response[0] != COMMAND["read_alarms"][0] or response[2] != packet: | |
raise RuntimeError(f"Received unexpected packet {response=}") | |
log += response[3:] | |
for record in range(records): | |
data = log[record * 16 : (record + 1) * 16] | |
values = struct.unpack("!BH5B4s4s", data) | |
alarm = values[0] | |
date = datetime.datetime(*values[1:7]) | |
limit, unit = values[7:9] | |
UNIT = self.RATE_UNIT if alarm == 0x01 else self.DOSE_UNIT | |
if unit not in UNIT: | |
logging.error(f"Unknown unit: {unit}") | |
continue | |
logging.debug(f"#{record+1} {date} >={limit.decode()} {UNIT[unit]}") | |
class MockFS5000(FS5000): | |
def __init__(self, port): | |
self.last = None | |
self.log = logging.getLogger("MockFS5000") | |
self.outbox = b"" | |
def write(self, value): | |
length = value[1] | |
self.last = command = value[2] | |
return len(value) | |
def read(self, length): | |
if self.last is None: | |
return b"" | |
outbox = self.outbox | |
if len(outbox): | |
self.outbox = outbox[length:] | |
return outbox[:length] | |
response = self.packet(bytes([self.last]) + RESPONSE["success"]) | |
self.outbox = response[length:] | |
return response[:length] | |
def get_port(): | |
ports = list_ports.comports() | |
for port in ports: | |
if (port.vid, port.pid) == VID_PID: | |
return port.device | |
raise FileNotFoundError("Device not found") | |
def main(): | |
port = get_port() | |
dev = FS5000(port) | |
dev.read_out() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment