Last active
August 18, 2019 16:07
-
-
Save alryaz/bdd90afac94e9c1499adbccf1c1e8a33 to your computer and use it in GitHub Desktop.
First attempt at developing homeassistant-discoverable MiScale integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
import argparse | |
import binascii | |
import time | |
import os | |
import csv | |
import sys | |
import json | |
import logging | |
from bluepy import btle | |
import paho.mqtt.client as mqtt | |
from datetime import datetime | |
########### | |
MISCALE_MAC = '<REDACTED>' | |
#MISCALE_MAC = None # Uncomment this to trigger autoscan | |
USER_IDENTIFIER = 12345678 | |
MQTT_USERNAME = '<REDACTED>' | |
MQTT_PASSWORD = '<REDACTED>' | |
MQTT_HOST = '<REDACTED>' | |
MQTT_PORT = 1883 | |
MQTT_TIMEOUT = 60 | |
HOMEASSISTANT_DISCOVERY = True | |
HOMEASSISTANT_DISCOVERY_PREFIX = 'homeassistant' | |
HOMEASSISTANT_LAZY_DISCOVERY = True # Sends discovery message on first weighing to stick a measurement unit | |
LOGGING_LEVEL = logging.DEBUG | |
FORCE_UNIT = None # Forces certain unit to be used in measurements | |
########### | |
WEIGHT_MEASUREMENT_SERVICE = '0000181d-0000-1000-8000-00805f9b34fb' | |
WEIGHT_MEASUREMENT_CHARACTERISTIC = '00002a9d-0000-1000-8000-00805f9b34fb' | |
WEIGHT_MEASUREMENT_HISTORY_CHARACTERISTIC = '00002a2f-0000-3512-2118-0009af100700' | |
CURRENT_TIME_CHARACTERISTIC = '00002a2b-0000-1000-8000-00805f9b34fb' | |
logging.basicConfig(filename="xiaomi_scale.log", level=LOGGING_LEVEL) | |
logging.getLogger().addHandler(logging.StreamHandler()) | |
userIdentifier = bytes([ | |
4, | |
255, | |
255, | |
(USER_IDENTIFIER & 65280) >> 8, | |
(USER_IDENTIFIER & 255) >> 0 | |
]) | |
class MiScaleBluetoothDelegate(btle.DefaultDelegate): | |
def __init__(self, deviceObject, measurementHandle, historyHandle, timeHandle): | |
btle.DefaultDelegate.__init__(self) | |
self.deviceObject = deviceObject | |
self.measurementHandle = measurementHandle | |
self.historyHandle = historyHandle | |
self.timeHandle = timeHandle | |
logging.debug('Initializing notification handler') | |
def handleNotification(self, cHandle, data): | |
data_hex = data.hex() | |
if (cHandle == self.timeHandle): | |
logging.debug('Processing time synchronization request') | |
scaleYear = int(data_hex[2:4] + data_hex[0:2], 16) | |
scaleMonth = int(data_hex[4:6], 16) | |
scaleDay = int(data_hex[6:8], 16) | |
current_dt = datetime.now() | |
if(not(scaleYear == current_dt.year and scaleMonth == current_dt.month and scaleDay == current_dt.day)): | |
logging.info('Date is incorrect, updating date on the MiScale') | |
logging.debug('Current date on MiScale is: ' + '{:04d}-{:04d}-{:04d}'.format(scaleYear, scaleMonth, scaleDay)) | |
# Time is not displayed in debug, it will be incorrect anyway | |
dt_byte = [ | |
current_dt.year % 256, | |
current_dt.year >> 8, | |
current_dt.month, | |
current_dt.day, | |
current_dt.hour, | |
current_dt.minute, | |
current_dt.second, | |
3, | |
0, | |
0 | |
] | |
self.deviceObject.device.writeCharacteristic(self.timeHandle, bytearray(dt_byte)) | |
logging.debug('MiScale date updated with current date and time') | |
elif (len(data) > 0): | |
#print('data incoming') | |
#print(data_hex + " " + str(len(data))) | |
if (data[0] == 3): | |
logging.debug('Processing stop signal') | |
self.deviceObject.device.writeCharacteristic(self.historyHandle, b'\0x03') | |
self.deviceObject.device.writeCharacteristic(self.historyHandle, userIdentifier) | |
if len(data) == 20: | |
logging.debug('Processing history notification') | |
self.deviceObject.ProcessWeight(data[0:10]) | |
self.deviceObject.ProcessWeight(data[10:20]) | |
elif len(data) == 10: | |
logging.debug('Processing single data notification') | |
self.deviceObject.ProcessWeight(data[0:10]) | |
class MiScaleDevice(): | |
def ReadPeopleData(self, file): | |
people = [] | |
with open(file, "r") as file_obj: | |
reader = csv.DictReader(file_obj) | |
for line in reader: | |
line['weight_min'] = int(line['weight_min']) | |
line['weight_max'] = int(line['weight_max']) | |
line['height'] = float(line['height']) | |
people.append(line) | |
return people | |
def ProcessWeight(self, data): | |
data_hex = data.hex() | |
logging.debug('Processing weight data: ' + data_hex) | |
scaleYear = int(data_hex[8:10] + data_hex[6:8], 16) | |
scaleMonth = int(data[5]) | |
scaleDay = int(data[6]) | |
scaleHours = int(data[7]) | |
scaleMinutes = int(data[8]) | |
scaleSeconds = int(data[9]) | |
scaleWeight = round(0.01 * int(data_hex[4:6] + data_hex[2:4], 16),1) | |
firstByte = data[0] | |
isLBSUnit = firstByte >> 0 & 1 | |
isJinUnit = firstByte >> 4 & 1 | |
isStabilized = firstByte >> 5 & 1 | |
isWeightRemoved = firstByte >> 7 & 1 | |
unit = 'jin' | |
if isLBSUnit: | |
unit = 'lbs' | |
elif not isJinUnit: | |
scaleWeight = scaleWeight / 2 | |
unit = 'kg' | |
logging.debug('MiScale is configured to use "' + unit + '" as measurement') | |
if(isStabilized == 1 and isWeightRemoved != 1): | |
logging.info('Received stabilized weight information: ' + str(scaleWeight) + ' ' + unit) | |
self.PublishWeightInformation(scaleWeight, unit) | |
else: | |
logging.debug('Current weight: ' + str(scaleWeight) + ' ' + unit) | |
def PublishWeightInformation(self, scaleWeight, unit): | |
if self.unit is None: | |
logging.debug('Received unit information. Scale uses "' + unit + '" for measurement') | |
self.unit = unit | |
if HOMEASSISTANT_DISCOVERY and HOMEASSISTANT_LAZY_DISCOVERY and not self.hass_discovery_sent: | |
self._publish_homeassistant_discovery() | |
if unit != self.unit: | |
logging.warning('Received weight in unit "' + unit + '", although expected unit is "' + self.unit + '". This might either be a protocol error or a configuration mistake. Fixing.') | |
if self.unit == 'kg': | |
if unit == 'jin': | |
scaleWeight = 0.5 * scaleWeight | |
elif unit == 'lbs': | |
scaleWeight = 0.45359237 * scaleWeight | |
elif self.unit == 'lbs': | |
# jin = 2kg, so why not? i know it looks dirty :P | |
scaleWeight = 2.2046226218488 * scaleWeight | |
if unit == 'jin': | |
scaleWeight = 0.5 * scaleWeight | |
elif self.unit == 'jin': | |
if unit == 'kg': | |
scaleWeight = 2 * scaleWeight | |
elif unit == 'lbs': | |
scaleWeight = 0.90718474 * scaleWeight | |
scaleWeight = round(scaleWeight, 1) | |
logging.debug('Converted weight: ' + str(scaleWeight) + ' ' + self.unit) | |
logging.debug('Publishing weight with topic "' + self.mqtt_topic + '" (MQTT message: "' + str(scaleWeight) + '")') | |
self.mqtt_client.publish(self.mqtt_topic, scaleWeight) | |
logging.debug('Scale weight published') | |
def PrintCurrentDeviceAbilities(self): | |
for svc in self.device.getServices(): | |
print(str(svc) + ' (' + str(svc.uuid) + ')') | |
for chr in svc.getCharacteristics(): | |
print(' ' + str(chr) + ' (' + str(chr.uuid) + ')') | |
for dsc in chr.getDescriptors(): | |
print(' ' + str(dsc) + ' (' + str(dsc.uuid) + ')') | |
def __init__(self, address): | |
print('initializing miscale') | |
self.strippedAddress = address.replace(':','').lower() | |
self.mqtt_client = None | |
self.mqtt_topic = 'sensor/miscale_' + self.strippedAddress + '/state' | |
self.connected = False | |
self.unit = None | |
self.hass_discovery_sent = False | |
self._start_client() | |
self.address = address | |
if FORCE_UNIT is not None and FORCE_UNIT: | |
#@TODO: check for unit validity | |
logging.debug('Forcing unit conversion to: ' + FORCE_UNIT) | |
self.unit = FORCE_UNIT | |
while True: | |
try: | |
logging.info('Connecting to Mi Scale ' + address + '...') | |
self._connect_miscale() | |
self._setup_miscale_v1() # assume we have v1 | |
if HOMEASSISTANT_DISCOVERY and not HOMEASSISTANT_LAZY_DISCOVERY: | |
if self.unit is None: | |
logging.warning('Using kilograms as the default unit') | |
self.unit = 'kg' | |
self._publish_homeassistant_discovery() | |
logging.debug('Beginning notification loop') | |
while True: | |
if self.device.waitForNotifications(1.0): | |
logging.debug('Notification processing finished') | |
continue | |
except btle.BTLEDisconnectError as e: | |
logging.debug('Device went away, reconnecting') | |
def _reconnect(self): | |
print('device disconnected, reconnecting') | |
time.sleep(1.0) | |
def _publish_homeassistant_discovery(self): | |
homeAssistantTopic = HOMEASSISTANT_DISCOVERY_PREFIX + '/sensor/miscale_' + self.strippedAddress + '/config' | |
message = json.dumps({ | |
#"device_class": "sensor", | |
"name": "Mi Scale " + self.strippedAddress, | |
"state_topic": self.mqtt_topic, | |
"unit_of_measurement": self.unit, | |
"icon": "mdi:scale", | |
"unique_id": "miscale_" + self.strippedAddress, | |
"device": { | |
"identifiers": ["miscale_" + self.strippedAddress, "miscale_" + self.device_info['serial']], | |
"manufacturer": "Xiaomi Inc.", | |
"model": "Mi Scale v1", | |
"name": "Mi Scale " + self.strippedAddress, | |
"sw_version": self.device_info['firmware'] | |
} | |
}) | |
logging.info('Publishing HomeAssistant MQTT Discovery message') | |
logging.debug("HASS Discovery topic " + homeAssistantTopic) | |
logging.debug("HASS Discovery message " + message) | |
self.mqtt_client.publish(homeAssistantTopic, message, qos=0, retain=True) | |
self.hass_discovery_send = True | |
def _connect_miscale(self): | |
logging.debug('Connecting to bluetooth peripheral: ' + str(self.address)) | |
self.device = device = btle.Peripheral( self.address ) | |
dis = self.device.getServiceByUUID('0000180a-0000-1000-8000-00805f9b34fb') | |
snc = dis.getCharacteristics('00002a25-0000-1000-8000-00805f9b34fb')[0] | |
fwc = dis.getCharacteristics('00002a28-0000-1000-8000-00805f9b34fb')[0] | |
gas = self.device.getServiceByUUID('00001800-0000-1000-8000-00805f9b34fb') | |
dnc = gas.getCharacteristics('00002a00-0000-1000-8000-00805f9b34fb')[0] | |
apc = gas.getCharacteristics('00002a00-0000-1000-8000-00805f9b34fb')[0] | |
self.device_info = { | |
'firmware': fwc.read().decode('utf-8'), | |
'serial': snc.read().decode('utf-8'), | |
'name': dnc.read().decode('utf-8'), | |
'appearance': apc.read().decode('utf-8'), | |
} | |
print(self.device_info) | |
def _setup_miscale_v1(self): | |
print('setting up miscale v1') | |
wms = self.device.getServiceByUUID(WEIGHT_MEASUREMENT_SERVICE) | |
wmc = wms.getCharacteristics(WEIGHT_MEASUREMENT_CHARACTERISTIC)[0] | |
wmhc = wms.getCharacteristics(WEIGHT_MEASUREMENT_HISTORY_CHARACTERISTIC)[0] | |
ctc = wms.getCharacteristics(CURRENT_TIME_CHARACTERISTIC)[0] | |
helper = MiScaleBluetoothDelegate(self, wmc.valHandle, wmhc.valHandle, ctc.valHandle) | |
currentTime = ctc.read() | |
helper.handleNotification(ctc.valHandle, currentTime) | |
self.device.withDelegate(helper) | |
#self.device.writeCharacteristic(wmhc.valHandle, b'\0x01\0x96\0x8a\0xbd\0x62') | |
#self.device.writeCharacteristic(wmhc.valHandle+1, b'\0x01\0x00') | |
self.device.writeCharacteristic(wmc.valHandle+1, b'\x01\x00') | |
def _start_client(self): | |
self.mqtt_client = mqtt.Client() | |
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) | |
def _on_connect(client, _, flags, return_code): | |
self.connected = True | |
print("MQTT connection returned result: %s" % mqtt.connack_string(return_code)) | |
def _on_message(client, _, message): | |
print("MQTT received '" + str(message.payload) + "' on topic " + str(message.topic)) | |
def _on_publish(client, _, mid): | |
print("MQTT message " + str(mid) + " published") | |
self.mqtt_client.on_connect = _on_connect | |
self.mqtt_client.on_message = _on_message | |
self.mqtt_client.on_publish = _on_publish | |
self.mqtt_client.connect(MQTT_HOST, MQTT_PORT, MQTT_TIMEOUT) | |
self.mqtt_client.loop_start() | |
def main(): | |
global MISCALE_MAC | |
if MISCALE_MAC is None: | |
try: | |
logging.info('Running preliminary Bluetooth device scan') | |
scanner = btle.Scanner() | |
devices = scanner.scan(10.0) | |
for device in devices: | |
if device.getValueText(9) == 'MI_SCALE': | |
MISCALE_MAC = device.addr | |
break | |
except btle.BTLEManagementError as e: | |
logging.error('Could not scan for devices. Please, set your MiScale\'s MAC address or run script with permissions for bluetooth scanning.') | |
exit() | |
# @TODO: Make multiple devices available | |
MiScaleDevice(MISCALE_MAC) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment