Last active
September 28, 2024 17:44
-
-
Save sonpython/d6346cb8a17c0c94ec046159e5fa7d36 to your computer and use it in GitHub Desktop.
Micropython code for collect and display topper sensor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import UART, Pin, SPI, PWM, unique_id, reset | |
import time | |
import binascii | |
import struct | |
import network | |
import socket | |
import json | |
from ili9341 import Display, color565 | |
from umqtt.simple import MQTTClient | |
# Default MQTT Configuration | |
MQTT_BROKER = "homeassistant.local" | |
MQTT_PORT = 1883 | |
MQTT_USER = "mqtt-sensor" | |
MQTT_PASSWORD = "mqtt-sensor-password" | |
MQTT_CLIENT_ID = f"smarttoper-sensor" | |
MQTT_TOPIC = "smarttoper/sensor_data" | |
# File to store WiFi and MQTT configuration | |
CONFIG_FILE = 'config.json' | |
# SPI configuration for display | |
try: | |
spi = SPI(1, baudrate=5000000, polarity=0, phase=0, sck=Pin(18), mosi=Pin(23)) | |
cs_pin = Pin(5) | |
dc_pin = Pin(2) | |
rst_pin = Pin(4) | |
print("Initializing display with 180-degree rotation...") | |
# Initialize the Display object with rotation=180 for upside down | |
display = Display(spi, cs=cs_pin, dc=dc_pin, rst=rst_pin, width=320, height=240, rotation=180) | |
display_initialized = True | |
print("Display initialized successfully.") | |
# Clear the display before writing new data | |
display.clear(color=color565(0, 0, 0)) | |
print("Cleared display.") | |
except Exception as e: | |
print("Failed to initialize display:", e) | |
display_initialized = False | |
# Initialize PWM for backlight control | |
backlight_pin = PWM(Pin(32)) | |
backlight_pin.freq(1000) # Set frequency to 1kHz | |
backlight_pin.duty(1023) # Set duty to max (1023 out of 1023 for full brightness) | |
# Load WiFi and MQTT configuration from file | |
def load_config(): | |
try: | |
with open(CONFIG_FILE, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Failed to load config: {e}") | |
display.draw_text8x8(10, 10, f"Failed to load config: {e}", color565(255, 255, 255)) | |
return {} | |
# Save configuration to file | |
def save_config(config): | |
try: | |
with open(CONFIG_FILE, 'w') as f: | |
json.dump(config, f) | |
except Exception as e: | |
print(f"Failed to save config: {e}") | |
# Clear WiFi credentials and configuration file | |
def clear_wifi_and_config(): | |
try: | |
wlan = network.WLAN(network.STA_IF) | |
wlan.active(True) | |
wlan.disconnect() # Disconnect from WiFi | |
wlan.active(False) # Disable WiFi | |
print("WiFi disconnected and deactivated.") | |
display.draw_text8x8(10, 10, f"WiFi disconnected and deactivated.", color565(255, 255, 255)) | |
# Delete configuration file | |
import os | |
os.remove(CONFIG_FILE) | |
print("Configuration file deleted.") | |
display.draw_text8x8(10, 30, f"WiFi disconnected and deactivated.", color565(255, 255, 255)) | |
# Reset the device to apply changes | |
time.sleep(1) | |
reset() | |
except Exception as e: | |
print(f"Failed to clear WiFi or configuration: {e}") | |
display.draw_text8x8(10, 50, f"Failed to clear WiFi or configuration: {e}", color565(255, 255, 255)) | |
# Load configuration on startup | |
config = load_config() | |
wifi_ssid = config.get('wifi_ssid', 'mp1') | |
wifi_password = config.get('wifi_password', 'Zxcasdqwe1231') | |
MQTT_BROKER = config.get('mqtt_broker', MQTT_BROKER) | |
MQTT_PORT = config.get('mqtt_port', MQTT_PORT) | |
MQTT_USER = config.get('mqtt_user', MQTT_USER) | |
MQTT_PASSWORD = config.get('mqtt_password', MQTT_PASSWORD) | |
MQTT_CLIENT_ID = config.get('mqtt_client_id', MQTT_CLIENT_ID) | |
MQTT_TOPIC = config.get('mqtt_topic', MQTT_TOPIC) | |
def send_uart_server_monitor_mode(): | |
try: | |
frame_header = 0x7D | |
frame_type = 0x04 | |
frame_length = 14 # Frame length: header (1) + type (1) + length (2) + ID (10) + content (1) + end (1) | |
device_id = b'CNU2000001' # Example ID, should match your device's actual ID | |
content = 0x20 # Command to enter server monitoring mode | |
end_byte = 0x0D | |
# Create the frame in the required format | |
frame = struct.pack('<BBH10sBB', frame_header, frame_type, frame_length, device_id, content, end_byte) | |
uart.write(frame) | |
print("UART command to enter server monitoring mode sent.") | |
except Exception as e: | |
print(f"Failed to send UART command: {e}") | |
# Setup reset button on GPIO 0 | |
reset_button = Pin(35, Pin.IN, Pin.PULL_UP) # Assume button is connected to GPIO 0 with a pull-up resistor | |
def monitor_reset_button(): | |
press_start_time = None | |
while True: | |
if reset_button.value() == 0: # Button is pressed (active low) | |
if press_start_time is None: | |
press_start_time = time.time() # Record the time when button is first pressed | |
elif time.time() - press_start_time >= 3: # Check if button is held for 5 seconds | |
print("Button held for 3 seconds. Clearing WiFi and config...") | |
clear_wifi_and_config() | |
break # Exit loop after clearing settings and resetting the device | |
else: | |
# If the button is released and press_start_time is set | |
if press_start_time is not None: | |
press_duration = time.time() - press_start_time | |
if press_duration < 3: | |
print("Button pressed briefly. Sending command to switch to monitoring mode.") | |
send_uart_server_monitor_mode() | |
press_start_time = None # Reset press_start_time after handling the press | |
time.sleep(0.1) # Debounce delay and reduce CPU usage | |
# Connect to WiFi | |
wlan = network.WLAN(network.STA_IF) | |
wlan.active(True) | |
try: | |
if not wlan.isconnected(): | |
print("Connecting to WiFi...") | |
display.draw_text8x8(10, 50, f"Connecting to WiFi..", color565(255, 255, 255)) | |
wlan.connect(wifi_ssid, wifi_password) | |
timeout = 10 | |
while not wlan.isconnected() and timeout > 0: | |
time.sleep(1) | |
timeout -= 1 | |
except: | |
print('Can not connect to wifi with current credentials') | |
display.draw_text8x8(10, 70, f"Can't connect to wifi", color565(255, 255, 255)) | |
# If not connected, start Access Point mode | |
if not wlan.isconnected(): | |
print("Failed to connect to WiFi. Starting Access Point...") | |
display.draw_text8x8(10, 90, f"Failed to connect to WiFi. Starting AP", color565(255, 255, 255)) | |
ap = network.WLAN(network.AP_IF) | |
ap.active(True) | |
ap.config(essid='SmartToper_Config', password='12345678') | |
display.draw_text8x8(10, 110, f"Connect to 'SmartToper_Config'", color565(255, 255, 255)) | |
print("Access Point started. Connect to 'SmartToper_Config' to configure.") | |
# Start a web server to accept WiFi and MQTT credentials | |
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] | |
s = socket.socket() | |
s.bind(addr) | |
s.listen(1) | |
print('Listening on', addr) | |
def web_page(): | |
html = """<html> | |
<head><title>SmartToper Configuration</title></head> | |
<body> | |
<h2>Configure WiFi and MQTT Settings</h2> | |
<form action="/submit" method="get"> | |
WiFi SSID: <input type="text" name="wifi_ssid"><br> | |
WiFi Password: <input type="password" name="wifi_password"><br> | |
MQTT Broker: <input type="text" name="mqtt_broker" value="{mqtt_broker}"><br> | |
MQTT Port: <input type="text" name="mqtt_port" value="{mqtt_port}"><br> | |
MQTT User: <input type="text" name="mqtt_user" value="{mqtt_user}"><br> | |
MQTT Password: <input type="password" name="mqtt_password" value="{mqtt_password}"><br> | |
MQTT Topic: <input type="text" name="mqtt_topic" value="{mqtt_topic}"><br> | |
<input type="submit" value="Save"> | |
</form> | |
</body> | |
</html>""".format( | |
mqtt_broker=MQTT_BROKER, | |
mqtt_port=MQTT_PORT, | |
mqtt_user=MQTT_USER, | |
mqtt_password=MQTT_PASSWORD, | |
mqtt_topic=MQTT_TOPIC, | |
) | |
return html | |
while True: | |
cl, addr = s.accept() | |
print('Client connected from', addr) | |
request = cl.recv(1024) | |
request = request.decode('utf-8') | |
print("Request:", request) | |
if request.startswith('GET /submit?'): | |
params = request.split(' ')[1].split('?')[1] | |
params = params.split('&') | |
config_data = {} | |
for param in params: | |
key, value = param.split('=') | |
config_data[key] = value | |
# Update configuration | |
wifi_ssid = config_data.get('wifi_ssid', wifi_ssid) | |
wifi_password = config_data.get('wifi_password', wifi_password) | |
MQTT_BROKER = config_data.get('mqtt_broker', MQTT_BROKER) | |
MQTT_PORT = int(config_data.get('mqtt_port', MQTT_PORT)) | |
MQTT_USER = config_data.get('mqtt_user', MQTT_USER) | |
MQTT_PASSWORD = config_data.get('mqtt_password', MQTT_PASSWORD) | |
MQTT_TOPIC = config_data.get('mqtt_topic', MQTT_TOPIC) | |
# Save updated configuration | |
new_config = { | |
'wifi_ssid': wifi_ssid, | |
'wifi_password': wifi_password, | |
'mqtt_broker': MQTT_BROKER, | |
'mqtt_port': MQTT_PORT, | |
'mqtt_user': MQTT_USER, | |
'mqtt_password': MQTT_PASSWORD, | |
'mqtt_client_id': MQTT_CLIENT_ID, | |
'mqtt_topic': MQTT_TOPIC | |
} | |
save_config(new_config) | |
response = "<html><body><h2>Settings Saved. Restart the device.</h2></body></html>" | |
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n') | |
cl.send(response) | |
cl.close() | |
# Reboot to apply new settings | |
reset() | |
else: | |
response = web_page() | |
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n') | |
cl.send(response) | |
cl.close() | |
else: | |
print("Connected to WiFi, IP address:", wlan.ifconfig()[0]) | |
# Initialize MQTT client | |
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD) | |
try: | |
print("Connecting to MQTT broker...") | |
mqtt_client.connect() | |
print("Connected to MQTT broker.") | |
except Exception as e: | |
print("Failed to connect to MQTT broker:", e) | |
# Start monitoring the reset button in a separate loop | |
import _thread | |
_thread.start_new_thread(monitor_reset_button, ()) | |
# Initialize UART with baudrate 115200, 8 data bits, 1 stop bit | |
uart = UART(1, baudrate=115200, bits=8, parity=None, stop=1, tx=17, rx=16) | |
status_mapping = { | |
0x01: "Get out of bed", | |
0x02: "Move", | |
0x03: "Sit up", | |
0x04: "Sleep", | |
0x05: "Wake up", | |
0x06: "Heavy object", | |
0x07: "Snore", | |
0x08: "Weak breathing" | |
} | |
def parse_status(status): | |
return status_mapping.get(status, "Unknown status") | |
def parse_frame(data): | |
frame_header = data[0:2] # Header 1 byte (hex) | |
frame_type = data[2:4] # Frame type 1 byte (hex) | |
frame_length = int(data[4:8], 16) # Frame length 2 byte (little-endian) | |
device_id = bytes.fromhex(data[8:28]).decode('ascii') # ID 10 byte (ASCII) | |
content = data[28:-2] # Content | |
end_byte = data[-2:] # End byte 1 byte (hex) | |
if frame_header == '7d' and end_byte == '0d': # valid byte transfer | |
return frame_header, frame_type, frame_length, device_id, content, end_byte | |
else: | |
print("Invalid data") | |
return None | |
def display_sensor_data(parse_data): | |
serial_number, timestamp, status, heart_rate, respiration_rate, sdata, pdata = parse_data | |
if not display_initialized: | |
print("Display not initialized. Cannot display data.") | |
return | |
try: | |
# Instead of clearing the entire display, we overwrite the previous content | |
# Write the data on the display using the draw_text8x8 method | |
display.draw_text8x8(10, 10, f"Serial Number: {serial_number} ", color565(255, 255, 255)) # Adding spaces to clear previous text | |
display.draw_text8x8(10, 30, f"Timestamp: {timestamp} ", color565(255, 255, 255)) | |
display.draw_text8x8(10, 50, f"Status: {status} ", color565(255, 255, 255)) | |
display.draw_text8x8(10, 70, f"Heart Rate: {heart_rate} bpm ", color565(255, 255, 255)) | |
display.draw_text8x8(10, 90, f"Respiration Rate: {respiration_rate} brts/min ", color565(255, 255, 255)) | |
display.draw_text8x8(10, 110, f"SDATA: {sdata} ", color565(255, 255, 255)) | |
display.draw_text8x8(10, 130, f"PDATA: {pdata} ", color565(255, 255, 255)) | |
print("Displayed sensor data successfully.") | |
except Exception as e: | |
print("Failed to display sensor data:", e) | |
def send_data_to_mqtt(parse_data): | |
serial_number, timestamp, status, heart_rate, respiration_rate, sdata, pdata = parse_data | |
# Create a dictionary with sensor data | |
sensor_data = { | |
"serial_number": serial_number, | |
"timestamp": timestamp, | |
"status": status, | |
"heart_rate": heart_rate, | |
"respiration_rate": respiration_rate, | |
"sdata": sdata, | |
"pdata": pdata | |
} | |
# Convert dictionary to a JSON-like string | |
sensor_data_str = str(sensor_data).replace("'", '"') | |
# Publish the sensor data | |
try: | |
mqtt_client.publish(MQTT_TOPIC, sensor_data_str) | |
print(f"Published data to MQTT: {sensor_data_str}") | |
except Exception as e: | |
print("Failed to publish data to MQTT:", e) | |
# Publish MQTT Discovery messages to Home Assistant | |
try: | |
device_config = { | |
"identifiers": [MQTT_CLIENT_ID], | |
"name": "Smartbed Sensor", | |
"model": "ESP32", | |
"manufacturer": "Custom" | |
} | |
# Heart Rate | |
mqtt_client.publish( | |
f"homeassistant/sensor/{MQTT_CLIENT_ID}/heart_rate/config", | |
'{"name": "Heart Rate", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.heart_rate }}", "unit_of_measurement": "bpm", "unique_id": "' + MQTT_CLIENT_ID + '_heart_rate", "device": ' + str(device_config).replace("'", '"') + '}', | |
retain=True | |
) | |
# Respiration Rate | |
mqtt_client.publish( | |
f"homeassistant/sensor/{MQTT_CLIENT_ID}/respiration_rate/config", | |
'{"name": "Respiration Rate", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.respiration_rate }}", "unit_of_measurement": "breaths/min", "unique_id": "' + MQTT_CLIENT_ID + '_respiration_rate", "device": ' + str(device_config).replace("'", '"') + '}', | |
retain=True | |
) | |
# Status | |
mqtt_client.publish( | |
f"homeassistant/sensor/{MQTT_CLIENT_ID}/status/config", | |
'{"name": "Status", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.status }}", "unique_id": "' + MQTT_CLIENT_ID + '_status", "device": ' + str(device_config).replace("'", '"') + '}', | |
retain=True | |
) | |
# SDATA | |
mqtt_client.publish( | |
f"homeassistant/sensor/{MQTT_CLIENT_ID}/sdata/config", | |
'{"name": "SDATA", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.sdata }}", "unique_id": "' + MQTT_CLIENT_ID + '_sdata", "device": ' + str(device_config).replace("'", '"') + '}', | |
retain=True | |
) | |
# PDATA | |
mqtt_client.publish( | |
f"homeassistant/sensor/{MQTT_CLIENT_ID}/pdata/config", | |
'{"name": "PDATA", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.pdata }}", "unique_id": "' + MQTT_CLIENT_ID + '_pdata", "device": ' + str(device_config).replace("'", '"') + '}', | |
retain=True | |
) | |
print("Published MQTT discovery messages to Home Assistant.") | |
except Exception as e: | |
print("Failed to publish MQTT discovery messages:", e) | |
def parse_sensor_data(hex_data): | |
if len(hex_data) == 24: | |
data = bytes.fromhex(hex_data) | |
# Device sends frame type [0x85] | |
# Use struct to decode bytes | |
serial_number = data[0] # Byte 1: Serial number | |
time_raw = struct.unpack('<I', data[1:5])[0] # Byte 2-5: Time (unsigned 4-byte integer, little-endian) | |
status = parse_status(data[5]) # Byte 6: Status | |
heart_rate = data[6] # Byte 7: Heart rate | |
respiration_rate = int(data[7]) / 10 # Byte 8: Respiration rate (divided by 10) | |
sdata = struct.unpack('<H', data[8:10])[0] # Byte 9-10: SDATA (unsigned 2-byte integer, little-endian) | |
pdata = struct.unpack('<H', data[10:12])[0] # Byte 11-12: PDATA (unsigned 2-byte integer, little-endian) | |
# Print the values for debugging purposes | |
print(f"Serial Number: {serial_number}") | |
print(f"Timestamp: {time_raw}") | |
print(f"Status: {status}") | |
print(f"Heart Rate: {heart_rate} bpm") | |
print(f"Respiration Rate: {respiration_rate} breaths/min") | |
print(f"SDATA: {sdata}") | |
print(f"PDATA: {pdata}") | |
return serial_number, time_raw, status, heart_rate, respiration_rate, sdata, pdata | |
# Function to read and parse sensor data from UART | |
def read_sensor_data(): | |
initialized = False | |
while True: | |
try: | |
if uart.any(): | |
data = uart.read() # Read all data from UART | |
if data: | |
# Convert the received data to a hex string | |
hex_data = binascii.hexlify(data).decode('utf-8') | |
# Print hex string for checking (optional) | |
print("Hex data:", hex_data) | |
if not initialized: | |
display.draw_text8x8(10, 50, f"Initializing in 3 minutes", color565(255, 255, 255)) | |
display.draw_text8x8(10, 70, hex_data[-30:], color565(255, 255, 255)) | |
# Call the function to parse the data | |
parsed_frame = parse_frame(hex_data) | |
if parsed_frame: | |
frame_header, frame_type, frame_length, device_id, content, end_byte = parsed_frame | |
print(frame_header, frame_type, frame_length, device_id, content, end_byte) | |
if frame_type == '85': # parse data from device in monitoring mode | |
initialized = True | |
data_parsed = parse_sensor_data(content) | |
display_sensor_data(data_parsed) | |
send_data_to_mqtt(data_parsed) | |
except Exception as e: | |
print('Exception while reading sensor data:', e) | |
# Wait for a bit before reading the next data | |
time.sleep(1) | |
# Call the function to read sensor data | |
read_sensor_data() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment