-
-
Save brookst/bdbede3a8d40eb8940a5b53e7ca1f6ce to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
__author__ = "Tim Brooks" | |
__email__ = "[email protected]" | |
__date__ = "2024-04-23" | |
import datetime | |
from enum import Enum, Flag | |
import logging | |
import serial | |
import serial.tools.list_ports as list_ports | |
import struct | |
logging.addLevelName(5, "TRACE") | |
logging.TRACE = 5 | |
COMMAND = { | |
"set_time": b"\x01", | |
"read_dose_curve": b"\x03", | |
"set_rate_limit": b"\x04", | |
"set_dose_limit": b"\x05", | |
"get_version": b"\x06", | |
"get_dose": b"\x07", | |
"set_alert": b"\x08", | |
"set_display": b"\x09", | |
"set_mode": b"\x0a", | |
"set_language": b"\x0b", | |
"timer_duration": b"\x0c", | |
"clear_dose": b"\x0d", | |
"read": b"\x0e", | |
"read_rate_curve": b"\x0f", | |
"read_alarms": b"\x10", | |
} | |
RESPONSE = { | |
"readback": b"\x04", | |
"success": b"\x06", | |
"read_starting": b"\x0e\x06\x01", | |
"read_stopping": b"\x0e\x06\x00", | |
} | |
VID_PID = (0x1A86, 0x7523) | |
class FS5000: | |
def __init__(self, port): | |
self.port = serial.Serial(port, 115200, timeout=2) | |
self.log = logging.getLogger("FS5000") | |
def log_bytes(self, data, purpose, level=logging.DEBUG): | |
"""Log raw hex bytes""" | |
# The stacklevel arg sets funcName to the caller, not this frame | |
if self.log.getEffectiveLevel() <= level: | |
title = f"{len(data)} bytes {purpose}: " | |
if len(title) + len(data) * 3 < 80: | |
for b in data: | |
title += f"{b:02x} " | |
self.log.log(level, title, stacklevel=2) | |
return | |
self.log.log(level, title, stacklevel=2) | |
string = "" | |
for b in data: | |
string += f"{b:02x} " | |
if len(string) >= 48: | |
self.log.log(level, string, stacklevel=2) | |
string = "" | |
if string: | |
self.log.log(level, string, stacklevel=2) | |
def write(self, data): | |
self.log_bytes(data, "written", logging.TRACE) | |
return self.port.write(data) | |
def read(self, length): | |
response = self.port.read(length) | |
self.log_bytes(response, "read", logging.TRACE) | |
return response | |
def checksum(self, data: bytes) -> bytes: | |
return bytes([sum(data) % 256]) | |
def packet(self, payload: bytes): | |
self.log.log(logging.TRACE, f"{len(payload)=}") | |
data = b"\xaa" | |
# Length + checksum and trailer byte to follow: | |
data += bytes([len(payload) + 3]) | |
data += payload | |
data += self.checksum(data) | |
data += b"\x55" | |
return data | |
def send(self, command: bytes): | |
data = self.packet(command) | |
self.write(data) | |
def recv(self): | |
header = self.read(2) | |
if len(header) == 0: | |
return None | |
if header[0] != 0xAA: | |
raise IOError(f"Read header 0x{header[0]:02x} not 0xaa") | |
length = header[1] | |
data = self.read(length - 1) | |
if data[-1] != 0x55: | |
raise IOError(f"Read trailer 0x{data[-1]:02x} not 0x55") | |
checksum = self.checksum(header + data[:-2])[0] | |
if checksum != data[-2]: | |
msg_checksum = data[-2] | |
self.log_bytes(data, "failed to verify") | |
raise IOError(f"Checksum failure {checksum:02x} != {msg_checksum:02x}") | |
return data[:-2] | |
def check_success(self, command): | |
response = self.recv() | |
expectation = bytes([command[0]]) + RESPONSE["success"] | |
if response[:2] != expectation: | |
raise RuntimeError(f"Received {response=}, expected {expectation}") | |
return response[2:] | |
def set_time(self, time: datetime.datetime = None): | |
if time is None: | |
time = datetime.datetime.now() | |
command = COMMAND["set_time"] | |
command += bytes([time.year % 100, time.month, time.day]) | |
command += bytes([time.hour, time.minute, time.second]) | |
self.send(command) | |
self.check_success(command) | |
def read_dose_log(self): | |
"""Fetch log of total dose""" | |
self.send(COMMAND["read_dose_curve"]) | |
response = self.check_success(COMMAND["read_dose_curve"]) | |
packets, records = struct.unpack("!BH", response) | |
log = b"" | |
for packet in range(1, packets + 1): | |
response = self.recv() | |
if response[0] != COMMAND["read_dose_curve"][0]: | |
raise RuntimeError(f"Received {response[0]=} not {COMMAND['read_dose_curve'][0]}") | |
if response[1] != packet: | |
raise RuntimeError(f"Received {response[1]=} not {packet=}") | |
log += response[2:] | |
self.log_bytes(log, "logged") | |
raise NotImplementedError("TODO: Parse dose curve") | |
DIGITS = ".0123456789" | |
MSV_H = b"mSvh" | |
USV_H = b"uSvh" | |
RATE_UNIT = { | |
MSV_H: "mSv/h", | |
USV_H: "μSv/h", | |
} | |
def set_rate_limit(self, value: str, unit: bytes = b"uSvh"): | |
if type(value) is not str: | |
raise TypeError("Rate limit must be 4 characters e.g. '2.50'") | |
if len(value) != 4: | |
raise ValueError("Rate limit must be 4 characters e.g. '2.50'") | |
if any(c not in self.DIGITS for c in value): | |
raise ValueError(f"Rate limit must be of characters {self.DIGITS}") | |
if unit not in self.RATE_UNIT: | |
raise ValueError("Rate limit must have unit 'uSvh' or 'mSvh'") | |
self.log.debug(f"{value=} {self.RATE_UNIT[unit]}") | |
command = COMMAND["set_rate_limit"] | |
command += value.encode("ascii") + unit | |
self.send(command) | |
self.check_success(command) | |
SV = b" Sv" | |
MSV = b"mSv" | |
USV = b"uSv" | |
DOSE_UNIT = { | |
SV: "Sv", | |
MSV: "mSv", | |
USV: "μSv", | |
} | |
def set_dose_limit(self, limit: str, unit: bytes = b"uSv"): | |
# raise NotImplementedError("Dose rate limit unit not yet understood.") | |
if type(limit) is not str: | |
raise TypeError("Dose limit must be 4 characters e.g. '2.50'") | |
if len(limit) != 4: | |
raise ValueError("Dose limit must be 4 characters e.g. '2.50'") | |
if any(c not in self.DIGITS for c in limit): | |
raise ValueError(f"Dose limit must be of characters {self.DIGITS}") | |
if unit not in self.DOSE_UNIT: | |
raise ValueError("Dose limit must have unit 'uSv', 'mSv' or ' Sv'") | |
self.log.debug(limit.encode("ascii") + unit) | |
command = COMMAND["set_dose_limit"] | |
command += limit.encode("ascii") + unit | |
self.send(command) | |
self.check_success(command) | |
def get_version(self): | |
self.send(COMMAND["get_version"]) | |
response = self.recv() | |
return response | |
def get_dose(self): | |
"""Fetch current total dose""" | |
self.send(COMMAND["get_dose"]) | |
response = self.check_success(COMMAND["get_dose"]) | |
_, dose, *date = struct.unpack("!II5B", response) | |
dose *= 0.01 # Convert to μSv | |
year, month, day, hour, minute = date | |
year += 2000 # We saved an extra byte! This will surely never cause problems... | |
date = datetime.datetime(year, month, day, hour, minute) | |
self.log.info(f"{dose:.2f} μSv starting {date}") | |
return (dose, date) | |
class Notify(Flag): | |
LAMP = 0x01 | |
SOUND = 0x02 | |
VIBE = 0x04 | |
CLICK = 0x08 | |
def set_alert(self, value: Notify): | |
if type(value) is not self.Notify: | |
raise ValueError("Alert setting must be of type Notify") | |
command = COMMAND["set_alert"] | |
command += bytes([value.value]) | |
self.send(command) | |
self.check_success(command) | |
def set_display(self, brightness: int, timeout: int): | |
if brightness < 0 or brightness > 5: | |
raise ValueError("Brightness must be in range [0-5]") | |
if timeout < 0 or timeout > 9999: | |
raise ValueError("Timeout must be in range [0-9999]") | |
command = COMMAND["set_display"] | |
command += bytes([brightness, timeout // 256, timeout % 256]) | |
self.send(command) | |
self.check_success(command) | |
def set_mode(self, mode=bool): | |
"""Set True to enable long endurance mode""" | |
command = COMMAND["set_mode"] | |
command += bytes([mode]) | |
self.send(command) | |
self.check_success(command) | |
class Language(Enum): | |
CHINESE = 0x00 | |
ENGLISH = 0x01 | |
def set_language(self, value: Language): | |
if type(value) is not self.Language: | |
raise ValueError("Language setting must be of type Language") | |
command = COMMAND["set_language"] | |
command += bytes([value.value]) | |
self.send(command) | |
self.check_success(command) | |
def get_duration(self): | |
"""Get the period of a 'timed dose' measurement""" | |
# This seems like a rejected command, but the current value gets read back | |
command = COMMAND["timer_duration"] + b"\x01" | |
self.send(command) | |
response = self.recv() | |
expectation = COMMAND["timer_duration"] + RESPONSE["readback"] + b"\x00" | |
if response[:3] != expectation: | |
raise RuntimeError(f"Received {response=}, expected {expectation}") | |
seconds = struct.unpack("!I", response[3:])[0] | |
self.log.info(f"Got timed duration {seconds} s") | |
return seconds | |
def set_duration(self, seconds): | |
"""Set the period of a 'timed dose' measurement in seconds""" | |
command = COMMAND["timer_duration"] | |
command += struct.pack("!BI", 0, seconds) # 0 to set value, non-zero gets it | |
self.send(command) | |
self.check_success() | |
def clear_dose(self): | |
"""Clear the accumulated dose total, returns date-time of reset""" | |
command = COMMAND["clear_dose"] | |
self.send(command) | |
response = self.check_success() | |
date = struct.unpack("!6B", response) | |
year, month, day, hour, minute = date | |
year += 2000 # We saved an extra byte! This will surely never cause problems... | |
date = datetime.datetime(year, month, day, hour, minute) | |
return date | |
def start_read(self): | |
"""Command start of continuous data readout""" | |
self.send(COMMAND["read"] + b"\x01") | |
message = self.recv() | |
if message != RESPONSE["read_starting"]: | |
raise RuntimeError(f"Expected start of counts, got {message}") | |
def stop_read(self): | |
"""Wait until the stop response is read back""" | |
self.send(COMMAND["read"] + b"\x00") | |
while True: | |
try: | |
message = self.recv() | |
if message != RESPONSE["read_stopping"]: | |
raise RuntimeError(f"Expected stopping of counts, got {message}") | |
break | |
except IOError: | |
pass | |
def yield_data(self): | |
"""Continuously yield a semicolon separated record of date-time, instantaneous dose rate, total dose, | |
counts per second, counts per minute, average dose rate, timer, timed dose and alarm status""" | |
self.start_read() | |
try: | |
while True: | |
message = self.recv() | |
if message is None: | |
continue | |
now = datetime.datetime.now() | |
if message[0] != COMMAND["read"][0]: | |
raise RuntimeError(f"Unexpected datum marker: {message[0]=} != {COMMAND['read'][0]}") | |
now = now.isoformat(timespec="seconds") | |
yield now + ";" + message[1:].decode() | |
finally: | |
self.stop_read() | |
def read_out(self): | |
"""Read out data continuously""" | |
try: | |
for datum in self.yield_data(): | |
self.log.info(datum) | |
except KeyboardInterrupt: | |
pass | |
def read_rate_log(self): | |
"""Fetch log of dose rate""" | |
self.send(COMMAND["read_rate_curve"]) | |
response = self.check_success(COMMAND["read_rate_curve"]) | |
# No idea why there's a null in the centre here: | |
packets, _, records = struct.unpack("!HBH", response) | |
log = b"" | |
for packet in range(1, packets + 1): | |
response = self.recv() | |
command, packet_id = struct.unpack("!BH", response[:3]) | |
if command != COMMAND["read_rate_curve"][0]: | |
raise RuntimeError(f"Received {command=} not {COMMAND['read_rate_curve'][0]}") | |
if packet_id != packet: | |
raise RuntimeError(f"Received {packet_id=} not {packet=}") | |
log += response[3:] | |
self.log_bytes(log, "logged") | |
raise NotImplementedError("TODO: Parse dose rate curve") | |
def read_alarms(self): | |
self.send(COMMAND["read_alarms"]) | |
response = self.check_success(COMMAND["read_alarms"]) | |
_, packets, _, records = struct.unpack("!BBBH", response) | |
log = b"" | |
for packet in range(1, packets + 1): | |
response = self.recv() | |
if response[0] != COMMAND["read_alarms"][0]: | |
raise RuntimeError(f"Received {response[0]=} not {COMMAND['read_alarms'][0]}") | |
if response[2] != packet: | |
raise RuntimeError(f"Received {response[2]=} not {packet=}") | |
log += response[3:] | |
self.log_bytes(log, "logged") | |
for record in range(records): | |
data = log[record * 16 : (record + 1) * 16] | |
values = struct.unpack("!BH5B4s4s", data) | |
alarm = values[0] | |
date = datetime.datetime(*values[1:7]) | |
limit, unit = values[7:9] | |
if alarm == 0x01: | |
UNIT = self.RATE_UNIT | |
elif alarm == 0x02: | |
unit = unit[1:] # The log has an extra space in this case | |
UNIT = self.DOSE_UNIT | |
if unit not in UNIT: | |
self.log.error(f"Unknown unit: {unit}") | |
continue | |
self.log.log(logging.TRACE, f"{limit=} {unit=}") | |
self.log.info(f"#{record+1} {date} >={limit.decode()} {UNIT[unit]}") | |
class MockFS5000(FS5000): | |
def __init__(self, port): | |
self.last = None | |
self.log = logging.getLogger("MockFS5000") | |
self.outbox = b"" | |
def write(self, value): | |
self.log_bytes(value, "written", logging.TRACE) | |
length = value[1] | |
self.last = command = value[2] | |
return len(value) | |
def read(self, length): | |
"""Just report that the previous command succeeded""" | |
if self.last is None: | |
return b"" | |
# Partial message still waiting to be read | |
outbox = self.outbox | |
if len(outbox): | |
self.outbox = outbox[length:] | |
self.log_bytes(outbox[:length], "read", logging.TRACE) | |
return outbox[:length] | |
# Craft a new message to return to reader | |
response = self.packet(bytes([self.last]) + RESPONSE["success"]) | |
self.outbox = response[length:] | |
response = response[:length] | |
self.log_bytes(response, "read", logging.TRACE) | |
return response | |
def get_port(): | |
ports = list_ports.comports() | |
for port in ports: | |
if (port.vid, port.pid) == VID_PID: | |
return port.device | |
else: | |
raise FileNotFoundError() | |
def main(): | |
form = "%(levelname)s:%(name)s:%(funcName)s: %(message)s" | |
logging.basicConfig(level=logging.INFO, format=form) | |
# logging.basicConfig(level=logging.TRACE, format=form) | |
port = get_port() | |
dev = FS5000(port) | |
# dev = MockFS5000('/dev/null') | |
# dev.set_time() | |
# dev.read_dose_log() | |
# dev.set_rate_limit("0.50", "uSvh") | |
# dev.set_rate_limit("0.50", FS5000.USV_H) | |
# dev.set_dose_limit("0.01", " Sv") | |
# dev.set_dose_limit("2.50", FS5000.USV) | |
# version = dev.get_version() | |
# version = version.split(b'\x00') | |
# logging.info(f"{version=}") | |
# dev.get_dose() | |
# dev.set_alert(FS5000.Notify.LAMP | FS5000.Notify.VIBE) | |
# dev.set_display(0, 60) | |
# dev.set_mode(False) | |
# dev.set_language(FS5000.Language.ENGLISH) | |
# dev.get_duration() | |
# dev.set_duration(2 * 60 * 60) | |
# dev.clear_dose() | |
# now = datetime.datetime.now().isoformat(timespec="minutes").translate({58:45}) | |
# with open(f"fs5000_{now}.log", "w") as file: | |
# try: | |
# for datum in dev.yield_data(): | |
# print(datum, file=file, flush=True) | |
# except KeyboardInterrupt: | |
# pass | |
dev.read_out() | |
# dev.read_rate_log() | |
# dev.read_alarms() | |
# dev.send(COMMAND['set_dose_limit']) | |
# dev.send(b'\x0c\x02') | |
# while True: | |
# response = dev.recv() | |
# if response is None: | |
# break | |
# if len(response) == 0: | |
# break | |
if __name__ == "__main__": | |
main() |
thank you again, work just great for me (macOS, fs5000). BTW, is RadPro better? I think to switch too, but will need to order stm programmer.
The RadPro firmware is nicer but it doesn't really add any new functionality. I got a cheap programmer, soldered a 4 pin header to the board and the flashing was really easy. There's a python tool that will pull logs and stream live data over USB with a much nicer protocol that actually has documentation: https://github.com/Gissio/radpro/blob/main/docs/comm.md
Thank you again ) ordered stm32 programmer @ ali, will flash once it arrive. The RadPro protocol seems to be much easier, yes.
My solution which was needed for me in Java ->
if helps for others.
Thank's for supporting
I wanted to send the output from an FS-5000 to MQTT/Home Assistant for monitoring but I can't write python. I haven't been able to find a solution so as an experiment I asked an AI to modify this script to suit my needs. Much to my surprise it worked. I connected the FS-5000 to an old Raspberry Pi via USB. Here's the modified script, in case somebody else wishes to do the same:
https://gist.github.com/mouldybread/b1bf8f6bcd047192b21fa78e08a9e6c5
@davidwowa take a look on the script. its not a dumb protocol, you need to initiate transfer. easiest way would be to modify py script for your needs