Skip to content

Instantly share code, notes, and snippets.

@sonpython
Last active September 28, 2024 17:44
Show Gist options
  • Save sonpython/d6346cb8a17c0c94ec046159e5fa7d36 to your computer and use it in GitHub Desktop.
Save sonpython/d6346cb8a17c0c94ec046159e5fa7d36 to your computer and use it in GitHub Desktop.
Micropython code for collect and display topper sensor
from machine import UART, Pin, SPI, PWM, unique_id, reset
import time
import binascii
import struct
import network
import socket
import json
from ili9341 import Display, color565
from umqtt.simple import MQTTClient
# Default MQTT Configuration
MQTT_BROKER = "homeassistant.local"
MQTT_PORT = 1883
MQTT_USER = "mqtt-sensor"
MQTT_PASSWORD = "mqtt-sensor-password"
MQTT_CLIENT_ID = f"smarttoper-sensor"
MQTT_TOPIC = "smarttoper/sensor_data"
# File to store WiFi and MQTT configuration
CONFIG_FILE = 'config.json'
# SPI configuration for display
try:
spi = SPI(1, baudrate=5000000, polarity=0, phase=0, sck=Pin(18), mosi=Pin(23))
cs_pin = Pin(5)
dc_pin = Pin(2)
rst_pin = Pin(4)
print("Initializing display with 180-degree rotation...")
# Initialize the Display object with rotation=180 for upside down
display = Display(spi, cs=cs_pin, dc=dc_pin, rst=rst_pin, width=320, height=240, rotation=180)
display_initialized = True
print("Display initialized successfully.")
# Clear the display before writing new data
display.clear(color=color565(0, 0, 0))
print("Cleared display.")
except Exception as e:
print("Failed to initialize display:", e)
display_initialized = False
# Initialize PWM for backlight control
backlight_pin = PWM(Pin(32))
backlight_pin.freq(1000) # Set frequency to 1kHz
backlight_pin.duty(1023) # Set duty to max (1023 out of 1023 for full brightness)
# Load WiFi and MQTT configuration from file
def load_config():
try:
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Failed to load config: {e}")
display.draw_text8x8(10, 10, f"Failed to load config: {e}", color565(255, 255, 255))
return {}
# Save configuration to file
def save_config(config):
try:
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f)
except Exception as e:
print(f"Failed to save config: {e}")
# Clear WiFi credentials and configuration file
def clear_wifi_and_config():
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.disconnect() # Disconnect from WiFi
wlan.active(False) # Disable WiFi
print("WiFi disconnected and deactivated.")
display.draw_text8x8(10, 10, f"WiFi disconnected and deactivated.", color565(255, 255, 255))
# Delete configuration file
import os
os.remove(CONFIG_FILE)
print("Configuration file deleted.")
display.draw_text8x8(10, 30, f"WiFi disconnected and deactivated.", color565(255, 255, 255))
# Reset the device to apply changes
time.sleep(1)
reset()
except Exception as e:
print(f"Failed to clear WiFi or configuration: {e}")
display.draw_text8x8(10, 50, f"Failed to clear WiFi or configuration: {e}", color565(255, 255, 255))
# Load configuration on startup
config = load_config()
wifi_ssid = config.get('wifi_ssid', 'mp1')
wifi_password = config.get('wifi_password', 'Zxcasdqwe1231')
MQTT_BROKER = config.get('mqtt_broker', MQTT_BROKER)
MQTT_PORT = config.get('mqtt_port', MQTT_PORT)
MQTT_USER = config.get('mqtt_user', MQTT_USER)
MQTT_PASSWORD = config.get('mqtt_password', MQTT_PASSWORD)
MQTT_CLIENT_ID = config.get('mqtt_client_id', MQTT_CLIENT_ID)
MQTT_TOPIC = config.get('mqtt_topic', MQTT_TOPIC)
def send_uart_server_monitor_mode():
try:
frame_header = 0x7D
frame_type = 0x04
frame_length = 14 # Frame length: header (1) + type (1) + length (2) + ID (10) + content (1) + end (1)
device_id = b'CNU2000001' # Example ID, should match your device's actual ID
content = 0x20 # Command to enter server monitoring mode
end_byte = 0x0D
# Create the frame in the required format
frame = struct.pack('<BBH10sBB', frame_header, frame_type, frame_length, device_id, content, end_byte)
uart.write(frame)
print("UART command to enter server monitoring mode sent.")
except Exception as e:
print(f"Failed to send UART command: {e}")
# Setup reset button on GPIO 0
reset_button = Pin(35, Pin.IN, Pin.PULL_UP) # Assume button is connected to GPIO 0 with a pull-up resistor
def monitor_reset_button():
press_start_time = None
while True:
if reset_button.value() == 0: # Button is pressed (active low)
if press_start_time is None:
press_start_time = time.time() # Record the time when button is first pressed
elif time.time() - press_start_time >= 3: # Check if button is held for 5 seconds
print("Button held for 3 seconds. Clearing WiFi and config...")
clear_wifi_and_config()
break # Exit loop after clearing settings and resetting the device
else:
# If the button is released and press_start_time is set
if press_start_time is not None:
press_duration = time.time() - press_start_time
if press_duration < 3:
print("Button pressed briefly. Sending command to switch to monitoring mode.")
send_uart_server_monitor_mode()
press_start_time = None # Reset press_start_time after handling the press
time.sleep(0.1) # Debounce delay and reduce CPU usage
# Connect to WiFi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
try:
if not wlan.isconnected():
print("Connecting to WiFi...")
display.draw_text8x8(10, 50, f"Connecting to WiFi..", color565(255, 255, 255))
wlan.connect(wifi_ssid, wifi_password)
timeout = 10
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
except:
print('Can not connect to wifi with current credentials')
display.draw_text8x8(10, 70, f"Can't connect to wifi", color565(255, 255, 255))
# If not connected, start Access Point mode
if not wlan.isconnected():
print("Failed to connect to WiFi. Starting Access Point...")
display.draw_text8x8(10, 90, f"Failed to connect to WiFi. Starting AP", color565(255, 255, 255))
ap = network.WLAN(network.AP_IF)
ap.active(True)
ap.config(essid='SmartToper_Config', password='12345678')
display.draw_text8x8(10, 110, f"Connect to 'SmartToper_Config'", color565(255, 255, 255))
print("Access Point started. Connect to 'SmartToper_Config' to configure.")
# Start a web server to accept WiFi and MQTT credentials
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print('Listening on', addr)
def web_page():
html = """<html>
<head><title>SmartToper Configuration</title></head>
<body>
<h2>Configure WiFi and MQTT Settings</h2>
<form action="/submit" method="get">
WiFi SSID: <input type="text" name="wifi_ssid"><br>
WiFi Password: <input type="password" name="wifi_password"><br>
MQTT Broker: <input type="text" name="mqtt_broker" value="{mqtt_broker}"><br>
MQTT Port: <input type="text" name="mqtt_port" value="{mqtt_port}"><br>
MQTT User: <input type="text" name="mqtt_user" value="{mqtt_user}"><br>
MQTT Password: <input type="password" name="mqtt_password" value="{mqtt_password}"><br>
MQTT Topic: <input type="text" name="mqtt_topic" value="{mqtt_topic}"><br>
<input type="submit" value="Save">
</form>
</body>
</html>""".format(
mqtt_broker=MQTT_BROKER,
mqtt_port=MQTT_PORT,
mqtt_user=MQTT_USER,
mqtt_password=MQTT_PASSWORD,
mqtt_topic=MQTT_TOPIC,
)
return html
while True:
cl, addr = s.accept()
print('Client connected from', addr)
request = cl.recv(1024)
request = request.decode('utf-8')
print("Request:", request)
if request.startswith('GET /submit?'):
params = request.split(' ')[1].split('?')[1]
params = params.split('&')
config_data = {}
for param in params:
key, value = param.split('=')
config_data[key] = value
# Update configuration
wifi_ssid = config_data.get('wifi_ssid', wifi_ssid)
wifi_password = config_data.get('wifi_password', wifi_password)
MQTT_BROKER = config_data.get('mqtt_broker', MQTT_BROKER)
MQTT_PORT = int(config_data.get('mqtt_port', MQTT_PORT))
MQTT_USER = config_data.get('mqtt_user', MQTT_USER)
MQTT_PASSWORD = config_data.get('mqtt_password', MQTT_PASSWORD)
MQTT_TOPIC = config_data.get('mqtt_topic', MQTT_TOPIC)
# Save updated configuration
new_config = {
'wifi_ssid': wifi_ssid,
'wifi_password': wifi_password,
'mqtt_broker': MQTT_BROKER,
'mqtt_port': MQTT_PORT,
'mqtt_user': MQTT_USER,
'mqtt_password': MQTT_PASSWORD,
'mqtt_client_id': MQTT_CLIENT_ID,
'mqtt_topic': MQTT_TOPIC
}
save_config(new_config)
response = "<html><body><h2>Settings Saved. Restart the device.</h2></body></html>"
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n')
cl.send(response)
cl.close()
# Reboot to apply new settings
reset()
else:
response = web_page()
cl.send('HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n')
cl.send(response)
cl.close()
else:
print("Connected to WiFi, IP address:", wlan.ifconfig()[0])
# Initialize MQTT client
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, user=MQTT_USER, password=MQTT_PASSWORD)
try:
print("Connecting to MQTT broker...")
mqtt_client.connect()
print("Connected to MQTT broker.")
except Exception as e:
print("Failed to connect to MQTT broker:", e)
# Start monitoring the reset button in a separate loop
import _thread
_thread.start_new_thread(monitor_reset_button, ())
# Initialize UART with baudrate 115200, 8 data bits, 1 stop bit
uart = UART(1, baudrate=115200, bits=8, parity=None, stop=1, tx=17, rx=16)
status_mapping = {
0x01: "Get out of bed",
0x02: "Move",
0x03: "Sit up",
0x04: "Sleep",
0x05: "Wake up",
0x06: "Heavy object",
0x07: "Snore",
0x08: "Weak breathing"
}
def parse_status(status):
return status_mapping.get(status, "Unknown status")
def parse_frame(data):
frame_header = data[0:2] # Header 1 byte (hex)
frame_type = data[2:4] # Frame type 1 byte (hex)
frame_length = int(data[4:8], 16) # Frame length 2 byte (little-endian)
device_id = bytes.fromhex(data[8:28]).decode('ascii') # ID 10 byte (ASCII)
content = data[28:-2] # Content
end_byte = data[-2:] # End byte 1 byte (hex)
if frame_header == '7d' and end_byte == '0d': # valid byte transfer
return frame_header, frame_type, frame_length, device_id, content, end_byte
else:
print("Invalid data")
return None
def display_sensor_data(parse_data):
serial_number, timestamp, status, heart_rate, respiration_rate, sdata, pdata = parse_data
if not display_initialized:
print("Display not initialized. Cannot display data.")
return
try:
# Instead of clearing the entire display, we overwrite the previous content
# Write the data on the display using the draw_text8x8 method
display.draw_text8x8(10, 10, f"Serial Number: {serial_number} ", color565(255, 255, 255)) # Adding spaces to clear previous text
display.draw_text8x8(10, 30, f"Timestamp: {timestamp} ", color565(255, 255, 255))
display.draw_text8x8(10, 50, f"Status: {status} ", color565(255, 255, 255))
display.draw_text8x8(10, 70, f"Heart Rate: {heart_rate} bpm ", color565(255, 255, 255))
display.draw_text8x8(10, 90, f"Respiration Rate: {respiration_rate} brts/min ", color565(255, 255, 255))
display.draw_text8x8(10, 110, f"SDATA: {sdata} ", color565(255, 255, 255))
display.draw_text8x8(10, 130, f"PDATA: {pdata} ", color565(255, 255, 255))
print("Displayed sensor data successfully.")
except Exception as e:
print("Failed to display sensor data:", e)
def send_data_to_mqtt(parse_data):
serial_number, timestamp, status, heart_rate, respiration_rate, sdata, pdata = parse_data
# Create a dictionary with sensor data
sensor_data = {
"serial_number": serial_number,
"timestamp": timestamp,
"status": status,
"heart_rate": heart_rate,
"respiration_rate": respiration_rate,
"sdata": sdata,
"pdata": pdata
}
# Convert dictionary to a JSON-like string
sensor_data_str = str(sensor_data).replace("'", '"')
# Publish the sensor data
try:
mqtt_client.publish(MQTT_TOPIC, sensor_data_str)
print(f"Published data to MQTT: {sensor_data_str}")
except Exception as e:
print("Failed to publish data to MQTT:", e)
# Publish MQTT Discovery messages to Home Assistant
try:
device_config = {
"identifiers": [MQTT_CLIENT_ID],
"name": "Smartbed Sensor",
"model": "ESP32",
"manufacturer": "Custom"
}
# Heart Rate
mqtt_client.publish(
f"homeassistant/sensor/{MQTT_CLIENT_ID}/heart_rate/config",
'{"name": "Heart Rate", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.heart_rate }}", "unit_of_measurement": "bpm", "unique_id": "' + MQTT_CLIENT_ID + '_heart_rate", "device": ' + str(device_config).replace("'", '"') + '}',
retain=True
)
# Respiration Rate
mqtt_client.publish(
f"homeassistant/sensor/{MQTT_CLIENT_ID}/respiration_rate/config",
'{"name": "Respiration Rate", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.respiration_rate }}", "unit_of_measurement": "breaths/min", "unique_id": "' + MQTT_CLIENT_ID + '_respiration_rate", "device": ' + str(device_config).replace("'", '"') + '}',
retain=True
)
# Status
mqtt_client.publish(
f"homeassistant/sensor/{MQTT_CLIENT_ID}/status/config",
'{"name": "Status", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.status }}", "unique_id": "' + MQTT_CLIENT_ID + '_status", "device": ' + str(device_config).replace("'", '"') + '}',
retain=True
)
# SDATA
mqtt_client.publish(
f"homeassistant/sensor/{MQTT_CLIENT_ID}/sdata/config",
'{"name": "SDATA", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.sdata }}", "unique_id": "' + MQTT_CLIENT_ID + '_sdata", "device": ' + str(device_config).replace("'", '"') + '}',
retain=True
)
# PDATA
mqtt_client.publish(
f"homeassistant/sensor/{MQTT_CLIENT_ID}/pdata/config",
'{"name": "PDATA", "state_topic": "' + MQTT_TOPIC + '", "value_template": "{{ value_json.pdata }}", "unique_id": "' + MQTT_CLIENT_ID + '_pdata", "device": ' + str(device_config).replace("'", '"') + '}',
retain=True
)
print("Published MQTT discovery messages to Home Assistant.")
except Exception as e:
print("Failed to publish MQTT discovery messages:", e)
def parse_sensor_data(hex_data):
if len(hex_data) == 24:
data = bytes.fromhex(hex_data)
# Device sends frame type [0x85]
# Use struct to decode bytes
serial_number = data[0] # Byte 1: Serial number
time_raw = struct.unpack('<I', data[1:5])[0] # Byte 2-5: Time (unsigned 4-byte integer, little-endian)
status = parse_status(data[5]) # Byte 6: Status
heart_rate = data[6] # Byte 7: Heart rate
respiration_rate = int(data[7]) / 10 # Byte 8: Respiration rate (divided by 10)
sdata = struct.unpack('<H', data[8:10])[0] # Byte 9-10: SDATA (unsigned 2-byte integer, little-endian)
pdata = struct.unpack('<H', data[10:12])[0] # Byte 11-12: PDATA (unsigned 2-byte integer, little-endian)
# Print the values for debugging purposes
print(f"Serial Number: {serial_number}")
print(f"Timestamp: {time_raw}")
print(f"Status: {status}")
print(f"Heart Rate: {heart_rate} bpm")
print(f"Respiration Rate: {respiration_rate} breaths/min")
print(f"SDATA: {sdata}")
print(f"PDATA: {pdata}")
return serial_number, time_raw, status, heart_rate, respiration_rate, sdata, pdata
# Function to read and parse sensor data from UART
def read_sensor_data():
initialized = False
while True:
try:
if uart.any():
data = uart.read() # Read all data from UART
if data:
# Convert the received data to a hex string
hex_data = binascii.hexlify(data).decode('utf-8')
# Print hex string for checking (optional)
print("Hex data:", hex_data)
if not initialized:
display.draw_text8x8(10, 50, f"Initializing in 3 minutes", color565(255, 255, 255))
display.draw_text8x8(10, 70, hex_data[-30:], color565(255, 255, 255))
# Call the function to parse the data
parsed_frame = parse_frame(hex_data)
if parsed_frame:
frame_header, frame_type, frame_length, device_id, content, end_byte = parsed_frame
print(frame_header, frame_type, frame_length, device_id, content, end_byte)
if frame_type == '85': # parse data from device in monitoring mode
initialized = True
data_parsed = parse_sensor_data(content)
display_sensor_data(data_parsed)
send_data_to_mqtt(data_parsed)
except Exception as e:
print('Exception while reading sensor data:', e)
# Wait for a bit before reading the next data
time.sleep(1)
# Call the function to read sensor data
read_sensor_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment