Last active
March 26, 2023 18:16
-
-
Save evanreichard/2bc07840d589f0ec2714b09428a959c7 to your computer and use it in GitHub Desktop.
Sonoff Tasmota RF Bridge Demultiplexer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE: | |
# The following is a _subset_ of HA's "automations.yaml" | |
- alias: '[RFBridge] Demultiplexer' | |
trigger: | |
- platform: mqtt | |
topic: tele/RFBridge/RESULT | |
- platform: mqtt | |
topic: tele/RFBridge/LWT | |
action: | |
- service: python_script.rfbridge_demux | |
data_template: | |
payload: '{{trigger}}' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE: | |
# The following is a _subset_ of HA's "configuration.yaml" | |
# Used to Normalize MQTT Topics | |
python_script: | |
mqtt: | |
binary_sensor: | |
- name: Half Bath Water Sensor | |
state_topic: 'home/half_bath_water_sensor' | |
device_class: moisture | |
off_delay: 10 | |
- name: Laundry Room Water Sensor | |
state_topic: 'home/laundry_room_water_sensor' | |
device_class: moisture | |
off_delay: 10 | |
- name: HVAC Drip Pan Water Sensor | |
state_topic: 'home/hvac_drip_pan_water_sensor' | |
device_class: moisture | |
off_delay: 10 | |
- name: Guest Bath Sink Water Sensor | |
state_topic: 'home/guest_bath_sink_water_sensor' | |
device_class: moisture | |
off_delay: 10 | |
- name: Master Bath Sink Water Sensor | |
state_topic: 'home/master_bath_sink_water_sensor' | |
device_class: moisture | |
off_delay: 10 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://community.home-assistant.io/t/sonoff-rf-bridge-strategies-for-receiving-data/108181 | |
# Sensor Payload Definition: | |
# '<PAYLOAD>':['<PUBLISH_TOPIC>','<STATE>','<RETAIN>'] | |
sensors = { | |
'F5ACFB': ['half_bath_water_sensor', 'ON', 'false'], | |
'F58AFB': ['laundry_room_water_sensor', 'ON', 'false'], | |
'F4AEFB': ['hvac_drip_pan_water_sensor', 'ON', 'false'], | |
'F819FB': ['guest_bath_sink_water_sensor', 'ON', 'false'], | |
'F4A5FB': ['master_bath_sink_water_sensor', 'ON', 'false'] | |
} | |
""" | |
Initial Entrypoint | |
""" | |
def process_data(): | |
payload = data.get('payload', {}) | |
topic = str(payload.get('topic')) | |
if topic == "tele/RFBridge/RESULT": | |
sensor_id = payload.get('payload_json', {}).get('RfReceived', {}).get('Data') | |
if sensor_id is not None: | |
set_update_state(sensor_id) | |
elif topic == "tele/RFBridge/LWT": | |
status = payload.get('payload') | |
if status == 'Online': | |
set_init_state() | |
else: | |
logger.warning('<rfbridge_demux> LWT Status Unknown: {}'.format(status)) | |
""" | |
On initial load we subscribe to "tele/RFBridge/LWT". This only returns | |
"Online" or "Offline". If we're "Online" set sensors to OFF. This is | |
acceptable because these sensors continously send updates if they're ON. | |
""" | |
def set_init_state(): | |
for sensor_id in sensors.keys(): | |
sensor = sensors[sensor_id] | |
sensor_state = hass.states.get('binary_sensor.{}'.format(sensor[0])).state | |
logger.info('<rfbridge_demux> Sensor {} State: {}'.format(sensor[0], sensor_state)) | |
# LWT Online & Current Unknown State -> Set OFF | |
if sensor_state == 'unknown': | |
logger.info('<rfbridge_demux> Sensor {} State Unknown -> Setting: OFF'.format(sensor[0])) | |
service_data = { | |
'topic': 'home/{}'.format(sensor[0]), | |
'payload': 'OFF', | |
'retain': '{}'.format(sensor[2]), | |
'qos': 0 | |
} | |
hass.services.call('mqtt', 'publish', service_data, False) | |
elif sensor_state in ['on', 'off']: | |
logger.info('<rfbridge_demux> Sensor {} State Known -> Skipping'.format(sensor[0])) | |
else: | |
logger.warning('<rfbridge_demux> Sensor {} State Unknown: {}'.format(sensor[0], sensor_state)) | |
""" | |
Given a Sensor ID, publish the new state. If the Sensor ID is unknown, | |
then publish an unknown state and print a warning. | |
""" | |
def set_update_state(sensor_id): | |
if sensor_id in sensors.keys(): | |
sensor = sensors[sensor_id] | |
logger.info('<rfbridge_demux> Updating Sensor: {}'.format(sensor)) | |
service_data = { | |
'topic': 'home/{}'.format(sensor[0]), | |
'payload': '{}'.format(sensor[1]), | |
'retain': '{}'.format(sensor[2]), | |
'qos': 0 | |
} | |
else: | |
logger.warning('<rfbridge_demux> Unknown Sensor: {}'.format(sensor_id)) | |
service_data = { | |
'topic': 'home/unknown', | |
'payload': '{}'.format(sensor_id), | |
'retain': 'false', | |
'qos': 0 | |
} | |
hass.services.call('mqtt', 'publish', service_data, False) | |
# Call Initial Load | |
process_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment