Last active
May 2, 2022 12:20
-
-
Save amitkeret/c756148fe4679fac6eaf39c590ac9e82 to your computer and use it in GitHub Desktop.
Check peripheral connectivity to Raspberry Pi and report via MQTT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
mqttuser = "*****" | |
mqttpwd = "*****" | |
mqtthost = "*****" | |
mqttport = 1883 | |
mqttkeepalive = 60 | |
piname = "*****" | |
import sys, os, re, subprocess, json, time, datetime, hashlib | |
import paho.mqtt.client as paho | |
script_dir = os.path.dirname(__file__) | |
# Power state | |
output = subprocess.check_output("echo 'pow 0' | cec-client -s | grep 'power status:'", shell=True) | |
power = output.strip().replace('power status: ', '') | |
# Unsure why this happens; but ignore this run and exit | |
if power == "unknown": | |
exit() | |
# Connected BT devices | |
bt = subprocess.check_output(['hcitool', 'con']).split() | |
btdevices = [x for x in bt if re.match('(\w{2}:){5}\w{2}', x)] | |
btdevices_count = len(btdevices) | |
btdevices_state = "on" if btdevices_count > 0 else "off" | |
# Serializing and comparing the current state to the last time we checked | |
# No need to continue if no state has changed | |
hash = hashlib.md5(json.dumps([power, btdevices]).encode()).hexdigest() | |
with open(os.path.join(script_dir, "periph_mqtt_state.md5"), "r") as file: | |
last_hash = file.read() | |
if hash == last_hash: | |
exit() | |
# Last message (UTC, ISO) | |
lastmsg = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') | |
# Save the current state | |
with open(os.path.join(script_dir, "periph_mqtt_state.md5"), "w") as file: | |
file.write(hash) | |
# Log this run's timestamp | |
with open(os.path.join(script_dir, "periph_mqtt.log"), "a") as file: | |
file.write(lastmsg + " " + json.dumps([power, btdevices]) + "\n") | |
# Setup state topic status | |
sensors = { | |
"power": power, | |
"bt_devices": btdevices_state | |
} | |
# Setup attributes topic status | |
attributes = { | |
"power": {}, | |
"bt_devices": { | |
"devices_count": btdevices_count, | |
"devices": btdevices} | |
} | |
for item in attributes: | |
attributes[item]["last_message"] = lastmsg | |
# Setup Home Assistant auto-discovery | |
ha_sensors = { | |
"power": { | |
"name": "Power", | |
"device_class": "power", | |
"payload_on": "on", | |
"payload_off": "standby"}, | |
"bt_devices": { | |
"name": "Connected BT devices", | |
"icon": "mdi:bluetooth" + ("-off" if btdevices_count == 0 else ""), | |
"payload_on": "on", | |
"payload_off": "off", | |
"device_class": "connectivity"} | |
} | |
pinamelc = piname.lower() | |
topic = "system-sensors/binary_sensor/%s" % (pinamelc) | |
for index, sensor in ha_sensors.items(): | |
ha_sensors[index]["name"] = "%s %s" % (piname, sensor["name"]) | |
ha_sensors[index]["unique_id"] = "%s_binary_sensor_%s" % (pinamelc, index) | |
ha_sensors[index]["state_topic"] = "%s/state" % (topic) | |
ha_sensors[index]["value_template"] = "{{ value_json.%s }}" % (index) | |
ha_sensors[index]["json_attributes_topic"] = "%s/attr" % (topic) | |
ha_sensors[index]["json_attributes_template"] = "{{ value_json.%s | tojson }}" % (index) | |
# Start MQTT connection | |
client = paho.Client() | |
client.username_pw_set(mqttuser, mqttpwd) | |
client.connect(mqtthost, mqttport, mqttkeepalive) | |
client.will_set("%s/availability" % (topic), "offline", 1, True) | |
# Publish states | |
client.publish("%s/state" % (topic), json.dumps(sensors), 1, False) | |
client.publish("%s/attr" % (topic), json.dumps(attributes), 1, False) | |
# A different approach; publish each sensor in its own topic | |
#for index, sensor in sensors.items(): | |
# client.publish("%s/%s" % (topic, index), sensor, 1, True) | |
# client.publish("%s/%s/attr" % (topic, index), json.dumps(attributes[index]), 1, True) | |
# Publish Home Assistant auto-discovery config | |
for index, sensor in ha_sensors.items(): | |
client.publish("homeassistant/binary_sensor/%s/%s/config" % (pinamelc, index), json.dumps(sensor), 1, True) | |
client.publish("%s/availability" % (topic), "online", 1, True) | |
time.sleep(1) | |
client.disconnect() | |
sys.exit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=Continuous loop reporting state of peripherals. | |
[Service] | |
Type=simple | |
ExecStart=/bin/bash /scripts/periph_mqtt.sh | |
[Install] | |
WantedBy=multi-user.target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
while true | |
do | |
echo "Running..." | |
python /scripts/periph_mqtt.py | |
sleep 10 | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
crontab doesn't support running faster than once/minute, so using endless loop