ble2mqtt was very unstable for me and I only use the wp6003 sensor anyway, so I wrote this small python script. based on https://github.com/zu2/wp6003 add configuration.yaml to home assistants configuration.yaml.
Requirements: bleak paho click
ble2mqtt was very unstable for me and I only use the wp6003 sensor anyway, so I wrote this small python script. based on https://github.com/zu2/wp6003 add configuration.yaml to home assistants configuration.yaml.
Requirements: bleak paho click
mqtt: | |
sensor: | |
- name: "Temperature" | |
state_topic: "bedroom/wp6003" | |
unit_of_measurement: "°C" | |
value_template: "{{ value_json.temperature }}" | |
- name: "TVOC" | |
state_topic: "bedroom/wp6003" | |
unit_of_measurement: "mg/m³" | |
value_template: "{{ value_json.tvoc }}" | |
- name: "HCHO" | |
state_topic: "bedroom/wp6003" | |
unit_of_measurement: "mg/m³" | |
value_template: "{{ value_json.hcho }}" | |
- name: "CO2" | |
state_topic: "bedroom/wp6003" | |
unit_of_measurement: "ppm" | |
value_template: "{{ value_json.co2 }}" |
import asyncio | |
import click | |
import datetime | |
import json | |
import paho.mqtt.client as mqtt | |
from bleak import BleakClient, BleakGATTCharacteristic | |
SERVICE = '0000FFF0-0000-1000-8000-00805F9B34FB' | |
COMMAND = '0000FFF1-0000-1000-8000-00805F9B34FB' | |
SENSOR = '0000FFF4-0000-1000-8000-00805F9B34FB' | |
mqtt_client = mqtt.Client() | |
mqtt_topic = None | |
def callback(sender: BleakGATTCharacteristic, data: bytearray): | |
global mqtt_topic | |
global mqtt_client | |
if len(data) < 16: | |
print("Hmm, little data here:", data) | |
return | |
print('Notify:',datetime.datetime.now(),' ',data.hex()) | |
print("Time: 20",data[1],"/",data[2],"/",data[3]," ",data[4],":",data[5],sep='') | |
temperature = (data[6]*256+data[7])/10 | |
print('Temp:',temperature,'℃') | |
tvoc = (data[10]*256+data[11])/1000 | |
print('TVOC:',tvoc,'mg/㎥') | |
hcho = (data[12]*256+data[13])/1000 | |
print('HCHO:',hcho,'mg/㎥') | |
co2 = (data[16]*256+data[17]) | |
print('CO2 :',co2,'ppm') | |
mqtt_client.connect("127.0.0.1") # maybe needed maybe not, dunno | |
mqtt_client.publish(mqtt_topic, payload=json.dumps({"temperature": temperature, "tvoc": tvoc, "hcho": hcho, "co2": co2})) | |
async def main_async(address, interval): | |
async with BleakClient(address) as client: | |
print("Getting service...") | |
service = client.services.get_service(SERVICE) | |
print("Getting command...") | |
command = service.get_characteristic(COMMAND) | |
print("Getting sensor...") | |
sensor = service.get_characteristic(SENSOR) | |
print("Sending init command...") | |
await client.write_gatt_char(command, bytes.fromhex("ee")) | |
print("Sending datetime...") | |
t = datetime.datetime.now() | |
await client.write_gatt_char(command, bytes([0xaa,t.year%100,t.month,t.day,t.hour,t.minute,t.second])) | |
print("Sending 2nd init command...") | |
await client.write_gatt_char(command, bytes.fromhex("ae")) | |
print("Sending 3rd init command...") | |
await client.write_gatt_char(command, bytes.fromhex("ab")) | |
print("Read Sensor data:") | |
await client.start_notify(sensor, callback) | |
while True: | |
await asyncio.sleep(interval) | |
await client.write_gatt_char(command, bytes.fromhex("ab")) | |
@click.command() | |
@click.option('--mac', required=True, help='Mac adress of WP6003 sensor.') | |
@click.option('--interval', default=10, help='Interval in seconds when to retrieve data.') | |
@click.option('--topic', default="bedroom/wp6003", help='topic on which the data is published on the local mqtt server') | |
def main(mac, interval, topic): | |
global mqtt_topic | |
mqtt_topic = topic | |
asyncio.run(main_async(mac, interval)) | |
if __name__ == "__main__": | |
main() |