Skip to content

Instantly share code, notes, and snippets.

@mouldybread
Last active March 4, 2025 18:29
Show Gist options
  • Save mouldybread/b1bf8f6bcd047192b21fa78e08a9e6c5 to your computer and use it in GitHub Desktop.
Save mouldybread/b1bf8f6bcd047192b21fa78e08a9e6c5 to your computer and use it in GitHub Desktop.
A python script to read the serial output from a Bosean FS-5000 Radiation Detector and send that information to an MQTT server. Based on the fs5000.py script shared by Tim Brooks / brookst.
#!/usr/bin/env python3
# Original script by Tim Brooks
# https://gist.github.com/brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce
# Disclaimer: I can't write python! I wanted to do this but couldn't find another way, so I asked an AI to modify the original script.
# It should run silently unless there is an error, and MQTT messages are only sent when a value changes.
# However I am not in a position to gauge the quality of the script.
import datetime
from enum import Enum, Flag
import logging
import serial
import serial.tools.list_ports as list_ports
import struct
import paho.mqtt.client as mqtt
logging.basicConfig(level=logging.ERROR, format="%(levelname)s:%(name)s:%(funcName)s: %(message)s")
COMMAND = {
"set_time": b"\x01",
"read_dose_curve": b"\x03",
"set_rate_limit": b"\x04",
"set_dose_limit": b"\x05",
"get_version": b"\x06",
"get_dose": b"\x07",
"set_alert": b"\x08",
"set_display": b"\x09",
"set_mode": b"\x0a",
"set_language": b"\x0b",
"timer_duration": b"\x0c",
"clear_dose": b"\x0d",
"read": b"\x0e",
"read_rate_curve": b"\x0f",
"read_alarms": b"\x10",
}
RESPONSE = {
"readback": b"\x04",
"success": b"\x06",
"read_starting": b"\x0e\x06\x01",
"read_stopping": b"\x0e\x06\x00",
}
VID_PID = (0x1A86, 0x7523)
# MQTT settings
MQTT_BROKER = "your_mqtt_broker_address"
MQTT_PORT = 1883
MQTT_USERNAME = "your_username"
MQTT_PASSWORD = "your_password"
MQTT_TOPIC_PREFIX = "your/mqtt/topic/prefix"
# MQTT client
client = mqtt.Client()
# Callback when the client connects to the broker
def on_connect(client, userdata, flags, rc):
if rc != 0:
logging.error(f"Failed to connect to MQTT Broker, return code {rc}")
# Callback when a message is published
def on_publish(client, userdata, mid):
logging.debug(f"Message {mid} published")
client.on_connect = on_connect
client.on_publish = on_publish
# Set the username and password for the MQTT client
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# Connect to the MQTT broker
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# Start the MQTT loop in a separate thread
client.loop_start()
class FS5000:
def __init__(self, port):
self.port = serial.Serial(port, 115200, timeout=2)
self.log = logging.getLogger("FS5000")
self.previous_data = {}
def write(self, data):
return self.port.write(data)
def read(self, length):
return self.port.read(length)
def checksum(self, data: bytes) -> bytes:
return bytes([sum(data) % 256])
def packet(self, payload: bytes):
data = b"\xaa" + bytes([len(payload) + 3]) + payload + self.checksum(data) + b"\x55"
return data
def send(self, command: bytes):
data = self.packet(command)
self.write(data)
def recv(self):
header = self.read(2)
if len(header) == 0:
return None
if header[0] != 0xAA:
raise IOError(f"Read header 0x{header[0]:02x} not 0xaa")
length = header[1]
data = self.read(length - 1)
if data[-1] != 0x55:
raise IOError(f"Read trailer 0x{data[-1]:02x} not 0x55")
checksum = self.checksum(header + data[:-2])[0]
if checksum != data[-2]:
raise IOError(f"Checksum failure {checksum:02x} != {data[-2]:02x}")
return data[:-2]
def check_success(self, command):
response = self.recv()
if response is None:
raise RuntimeError("No response received")
expectation = bytes([command[0]]) + RESPONSE["success"]
if response[:2] != expectation:
raise RuntimeError(f"Received {response=}, expected {expectation}")
return response[2:]
def set_time(self, time: datetime.datetime = None):
if time is None:
time = datetime.datetime.now()
command = COMMAND["set_time"] + bytes([time.year % 100, time.month, time.day, time.hour, time.minute, time.second])
self.send(command)
self.check_success(command)
def read_dose_log(self):
self.send(COMMAND["read_dose_curve"])
response = self.check_success(COMMAND["read_dose_curve"])
packets, records = struct.unpack("!BH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
if response[0] != COMMAND["read_dose_curve"][0] or response[1] != packet:
raise RuntimeError(f"Received unexpected packet {response=}")
log += response[2:]
raise NotImplementedError("TODO: Parse dose curve")
DIGITS = ".0123456789"
MSV_H = b"mSvh"
USV_H = b"uSvh"
RATE_UNIT = {
MSV_H: "mSv/h",
USV_H: "μSv/h",
}
def set_rate_limit(self, value: str, unit: bytes = b"uSvh"):
if not isinstance(value, str) or len(value) != 4 or any(c not in self.DIGITS for c in value):
raise ValueError("Rate limit must be 4 characters e.g. '2.50'")
if unit not in self.RATE_UNIT:
raise ValueError("Rate limit must have unit 'uSvh' or 'mSvh'")
command = COMMAND["set_rate_limit"] + value.encode("ascii") + unit
self.send(command)
self.check_success(command)
SV = b" Sv"
MSV = b"mSv"
USV = b"uSv"
DOSE_UNIT = {
SV: "Sv",
MSV: "mSv",
USV: "μSv",
}
def set_dose_limit(self, limit: str, unit: bytes = b"uSv"):
if not isinstance(limit, str) or len(limit) != 4 or any(c not in self.DIGITS for c in limit):
raise ValueError("Dose limit must be 4 characters e.g. '2.50'")
if unit not in self.DOSE_UNIT:
raise ValueError("Dose limit must have unit 'uSv', 'mSv' or ' Sv'")
command = COMMAND["set_dose_limit"] + limit.encode("ascii") + unit
self.send(command)
self.check_success(command)
def get_version(self):
self.send(COMMAND["get_version"])
return self.recv()
def get_dose(self):
self.send(COMMAND["get_dose"])
response = self.check_success(COMMAND["get_dose"])
_, dose, *date = struct.unpack("!II5B", response)
dose *= 0.01 # Convert to μSv
year, month, day, hour, minute = date
year += 2000
date = datetime.datetime(year, month, day, hour, minute)
return dose, date
class Notify(Flag):
LAMP = 0x01
SOUND = 0x02
VIBE = 0x04
CLICK = 0x08
def set_alert(self, value: Notify):
if not isinstance(value, self.Notify):
raise ValueError("Alert setting must be of type Notify")
command = COMMAND["set_alert"] + bytes([value.value])
self.send(command)
self.check_success(command)
def set_display(self, brightness: int, timeout: int):
if not (0 <= brightness <= 5):
raise ValueError("Brightness must be in range [0-5]")
if not (0 <= timeout <= 9999):
raise ValueError("Timeout must be in range [0-9999]")
command = COMMAND["set_display"] + bytes([brightness, timeout // 256, timeout % 256])
self.send(command)
self.check_success(command)
def set_mode(self, mode: bool):
command = COMMAND["set_mode"] + bytes([mode])
self.send(command)
self.check_success(command)
class Language(Enum):
CHINESE = 0x00
ENGLISH = 0x01
def set_language(self, value: Language):
if not isinstance(value, self.Language):
raise ValueError("Language setting must be of type Language")
command = COMMAND["set_language"] + bytes([value.value])
self.send(command)
self.check_success(command)
def get_duration(self):
command = COMMAND["timer_duration"] + b"\x01"
self.send(command)
response = self.recv()
expectation = COMMAND["timer_duration"] + RESPONSE["readback"] + b"\x00"
if response[:3] != expectation:
raise RuntimeError(f"Received {response=}, expected {expectation}")
seconds = struct.unpack("!I", response[3:])[0]
return seconds
def set_duration(self, seconds: int):
command = COMMAND["timer_duration"] + struct.pack("!BI", 0, seconds)
self.send(command)
self.check_success(command)
def clear_dose(self):
command = COMMAND["clear_dose"]
self.send(command)
response = self.check_success(command)
date = struct.unpack("!6B", response)
year, month, day, hour, minute = date
year += 2000
date = datetime.datetime(year, month, day, hour, minute)
return date
def start_read(self):
self.send(COMMAND["read"] + b"\x01")
message = self.recv()
if message != RESPONSE["read_starting"]:
raise RuntimeError(f"Expected start of counts, got {message}")
def stop_read(self):
self.send(COMMAND["read"] + b"\x00")
while True:
try:
message = self.recv()
if message == RESPONSE["read_stopping"]:
break
except IOError:
pass
def yield_data(self):
self.start_read()
try:
while True:
message = self.recv()
if message is None:
continue
now = datetime.datetime.now().isoformat(timespec="seconds")
yield now + ";" + message[1:].decode()
finally:
self.stop_read()
def read_out(self):
try:
for datum in self.yield_data():
data_parts = datum.split(";")
if len(data_parts) >= 9:
topics = [
f"{MQTT_TOPIC_PREFIX}/datetime",
f"{MQTT_TOPIC_PREFIX}/dose_rate",
f"{MQTT_TOPIC_PREFIX}/total_dose",
f"{MQTT_TOPIC_PREFIX}/cps",
f"{MQTT_TOPIC_PREFIX}/cpm",
f"{MQTT_TOPIC_PREFIX}/avg_dose_rate",
f"{MQTT_TOPIC_PREFIX}/timer",
f"{MQTT_TOPIC_PREFIX}/timed_dose",
f"{MQTT_TOPIC_PREFIX}/alarm_status"
]
for topic, value in zip(topics, data_parts):
if topic not in self.previous_data or self.previous_data[topic] != value:
result = client.publish(topic, value)
if result.rc == 0:
self.previous_data[topic] = value
else:
logging.error(f"Failed to send message to topic `{topic}`")
except KeyboardInterrupt:
pass
def read_rate_log(self):
self.send(COMMAND["read_rate_curve"])
response = self.check_success(COMMAND["read_rate_curve"])
packets, _, records = struct.unpack("!HBH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
command, packet_id = struct.unpack("!BH", response[:3])
if command != COMMAND["read_rate_curve"][0] or packet_id != packet:
raise RuntimeError(f"Received unexpected packet {response=}")
log += response[3:]
raise NotImplementedError("TODO: Parse dose rate curve")
def read_alarms(self):
self.send(COMMAND["read_alarms"])
response = self.check_success(COMMAND["read_alarms"])
_, packets, _, records = struct.unpack("!BBBH", response)
log = b""
for packet in range(1, packets + 1):
response = self.recv()
if response[0] != COMMAND["read_alarms"][0] or response[2] != packet:
raise RuntimeError(f"Received unexpected packet {response=}")
log += response[3:]
for record in range(records):
data = log[record * 16 : (record + 1) * 16]
values = struct.unpack("!BH5B4s4s", data)
alarm = values[0]
date = datetime.datetime(*values[1:7])
limit, unit = values[7:9]
UNIT = self.RATE_UNIT if alarm == 0x01 else self.DOSE_UNIT
if unit not in UNIT:
logging.error(f"Unknown unit: {unit}")
continue
logging.debug(f"#{record+1} {date} >={limit.decode()} {UNIT[unit]}")
class MockFS5000(FS5000):
def __init__(self, port):
self.last = None
self.log = logging.getLogger("MockFS5000")
self.outbox = b""
def write(self, value):
length = value[1]
self.last = command = value[2]
return len(value)
def read(self, length):
if self.last is None:
return b""
outbox = self.outbox
if len(outbox):
self.outbox = outbox[length:]
return outbox[:length]
response = self.packet(bytes([self.last]) + RESPONSE["success"])
self.outbox = response[length:]
return response[:length]
def get_port():
ports = list_ports.comports()
for port in ports:
if (port.vid, port.pid) == VID_PID:
return port.device
raise FileNotFoundError("Device not found")
def main():
port = get_port()
dev = FS5000(port)
dev.read_out()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment