Created
February 10, 2023 01:05
-
-
Save deepcoder/3aa75b2368e048c030bc5f79f9870680 to your computer and use it in GitHub Desktop.
python3 code to test decoding Aranet4 CO2 sensor advertising packet. TODO : add code to OpenMQTTGateway decoder library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# aranet4-decode.py | |
# decode aranet4 co2 monitor BLE advertising packets | |
# 202302091406 | |
# | |
# https://github.com/Anrijs/Aranet4-ESP32/blob/main/src/Aranet4.h | |
# ":" (colon) in C struct - what does it mean? [duplicate] | |
# https://stackoverflow.com/questions/8564532/colon-in-c-struct-what-does-it-mean | |
# | |
# typedef struct AranetManufacturerData { | |
# uint16_t manufacturer_id; | |
# uint8_t disconnected : 1, | |
# __unknown1 : 1, | |
# calib_state : 2, | |
# dfu_mode : 1, | |
# integrations : 1, | |
# __unknown2 : 2; | |
# struct { | |
# uint8_t patch; | |
# uint8_t minor; | |
# uint16_t major; | |
# } version; | |
# uint8_t hw_rev; | |
# uint16_t __unknown3; | |
# AranetData data; | |
# typedef struct AranetData { | |
# uint16_t co2 = 0; | |
# uint16_t temperature = 0; | |
# uint16_t pressure = 0; | |
# uint8_t humidity = 0; | |
# uint8_t battery = 0; | |
# uint8_t unkn = 0; | |
# uint16_t interval = 0; | |
# uint16_t ago = 0; | |
PROGRAM_NAME = "aranet4-decode" | |
VERSION_MAJOR = "1" | |
VERSION_MINOR = "6" | |
WORKING_DIRECTORY = "" | |
import sys | |
import threading | |
from datetime import datetime, timezone | |
import time | |
import json | |
import paho.mqtt.client as mqtt | |
# for healthcheck web server | |
import os | |
from http.server import HTTPServer, CGIHTTPRequestHandler | |
# global thread control flag | |
STOP_THREADS = False | |
# MAKE SURE this is same value as in Dockerfile. !!!!! | |
HEALTHCHECK_PORT=8998 | |
# mqtt setup | |
MQTT_SERVER = "192.168.2.242" | |
MQTT_TOPIC_BASE = "aranet4" | |
# manufacture id from advertising packet must match this | |
SENSOR_MANUFACTURER_ID = 1794 | |
# the routine created a 16 bit integer value from 2 string ascii values | |
# data bytes are in reverse endian order in bluetooth, compared to intel. so for 16 bit integer, 1st byte is low half, 2nd byte high half | |
def le16(data: str, start: int = 0) -> int: | |
l = int(data[start : start + 2], 16) | |
h = int(data[start + 2 : start + 4], 16) | |
v = l + (h * 256) | |
return v | |
def on_message(mqttc, userdata, message): | |
msg = json.loads(str(message.payload.decode("utf-8"))) | |
if "id" in msg: | |
# get time stamp of info retrieved in UTC time | |
retrieve_time = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") | |
# get bluetooth mac address of sensor | |
aranet4_mac = msg["id"] | |
# print(aranet4_mac) | |
# print("received message: " , msg) | |
# command line argument contains the full advertising packet as a hex value string | |
# ap = sys.argv[1] | |
# get raw advertising packet data | |
ap = msg["manufacturerdata"] | |
# get bluetooth signal quality | |
rssi = msg["rssi"] | |
# check if device manufacturer id is correct | |
device_id = le16(ap, 0) | |
if device_id == SENSOR_MANUFACTURER_ID : | |
# typedef struct AranetManufacturerData { | |
# uint16_t manufacturer_id; | |
# uint8_t disconnected : 1, | |
# __unknown1 : 1, | |
# calib_state : 2, | |
# dfu_mode : 1, | |
# integrations : 1, | |
# __unknown2 : 2; | |
# struct { | |
# uint8_t patch; | |
# uint8_t minor; | |
# uint16_t major; | |
# } version; | |
# uint8_t hw_rev; | |
# uint16_t __unknown3; | |
device_flags = str(bin(int(ap[4:5], 8))[2:].zfill(8)) | |
software_patch = int(ap[8:9]) | |
software_minor = int(ap[9:10]) | |
software_major = le16(ap, 10) | |
hardware_version = int(ap[14:15]) | |
# 2nd half of advertising packet is the dynamic sensor values, first half is info on device id, firmware version and hardware version and stuff | |
sp = ap[20:] | |
# check if we get long advertising string or short one, if not long then the 'smart home integrations' are probably turned off for device | |
if len(sp) != 0 : | |
# print(ap) | |
# print(sp) | |
# co2 value | |
c = le16(sp, 0) | |
# temperature in either fahrenheit or centigrade | |
t = round((le16(sp, 4) / 20.0) * 1.8 + 32.0, 1) | |
# t = le16(sp, 4) / 20.0 | |
# atmospheric pressure | |
p = int(round(le16(sp, 8) / 10.0, 0)) | |
# humidity in % | |
h = int(round(le16(sp, 10) / 255, 0)) | |
# battery level in % | |
b = int(round(le16(sp, 12) / 255, 0)) | |
# interval, not sure what this is | |
i = int(round(le16(sp, 14), 0)) | |
# this is sensor reading setting, 60, 120 or... seconds between reading, set in app | |
a = int(round(le16(sp, 18), 0)) | |
# set the subtopic based on mac address of aranet4 bluetooth address | |
MQTT_TOPIC_UNIT = aranet4_mac | |
# create json message string | |
xjs = \ | |
{ | |
"retrieve_time" : retrieve_time, \ | |
"co2" : c, \ | |
"temperature" : t, \ | |
"pressure" : p, \ | |
"humidity" : h, \ | |
"battery" : b, \ | |
"interval" : i, \ | |
"reading_interval" : a, \ | |
"rssi" : rssi \ | |
} | |
# publish message to mqtt | |
mqttc.publish(MQTT_TOPIC_BASE + "/" + MQTT_TOPIC_UNIT, json.dumps(xjs), 1) | |
print(retrieve_time, aranet4_mac, device_flags, software_major, software_minor, software_patch, hardware_version) | |
# print(retrieve_time, c, "CO2 ppm", t, "deg f", p, "hPa", h, "hum%", b, "batt%", i, "intv", a, "ago", "rssi", rssi) | |
else : | |
print(retrieve_time, aranet4_mac, "Smart Home integration may not be enabled, no sensor advertising data", device_flags, software_major, software_minor, software_patch, hardware_version) | |
# wrong manufacturer | |
else : | |
print("Device has wrong manufacturer id : ", device_id) | |
def healthcheck_webserver(): | |
global STOP_THREADS | |
global HEALTHCHECK_PORT | |
# Make sure the server is created at current directory | |
os.chdir('.') | |
# Create server object listening the port set at top of program | |
server_object = HTTPServer(server_address=('0.0.0.0', HEALTHCHECK_PORT), RequestHandlerClass=CGIHTTPRequestHandler) | |
# Start the web server | |
server_object.serve_forever() | |
while not STOP_THREADS: | |
time.sleep(1) | |
def main(): | |
global STOP_THREADS | |
# start simple web server for healthcheck | |
healthcheck=threading.Thread(target=healthcheck_webserver) | |
healthcheck.start() | |
try : | |
# connect to MQTT server | |
mqttc = mqtt.Client(PROGRAM_NAME) # Create instance of client with client ID | |
mqttc.connect(MQTT_SERVER, 1883) # Connect to (broker, port, keepalive-time) | |
print("Program start : " + PROGRAM_NAME + " Version : " + VERSION_MAJOR + "." + VERSION_MINOR) | |
# Start mqtt | |
mqttc.loop_start() | |
except Exception as e: | |
print("cannot initialize MQTT connection: " + MQTT_SERVER + " " + str(e)) | |
sys.exit(1) | |
mqttc.subscribe("home/OpenMQTTGateway/BTtoMQTT/#") | |
# procedure to execute on MQTT message in topic received | |
mqttc.on_message = on_message | |
try : | |
while 1 == 1 : | |
time.sleep(30) | |
except KeyboardInterrupt : | |
STOP_THREADS = True | |
mqttc.disconnect() | |
mqttc.loop_stop() | |
print("Ctrl-c exiting") | |
sys.exit(0) | |
except Exception as e: | |
print("Unhandled error : " + str(e)) | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
HI @deepcoder,
we stumbled across your gist here, with the comment
If you feel like collaborating on such a decoder, please open a discussion for Theengs Decoder.
Thanks