Skip to content

Instantly share code, notes, and snippets.

@michaelkuty
Created June 20, 2020 20:18
Show Gist options
  • Save michaelkuty/5ecc7b8acd04fb2ca21f0cdb6be1167b to your computer and use it in GitHub Desktop.
Save michaelkuty/5ecc7b8acd04fb2ca21f0cdb6be1167b to your computer and use it in GitHub Desktop.
Home Assistant Sonoff RF Bridge App Daemon
# RF codes located in appdaemon/config/rf_codes.yaml
93B408:
device_name: remote_ctl_1
name: button_1_left
description: Turn on scene
# sync: 0x1D4C
# low: 0x010E
C10A39:
device_name: motion_12
name: motion_12
description: Motion 12
delay: 30
import appdaemon.plugins.hass.hassapi as hass
import yaml
class RFBridge(hass.Hass):
def initialize(self) -> None:
self.listen_event(self.event_received, "esphome.rf_code_received")
self.listen_event(self.event_received, "state_changed")
with open(f'{self.config_dir}/config/rf_codes.yaml', 'r') as file:
self.rf_db = yaml.load(file, Loader=yaml.FullLoader)
self.known_sensors = {}
self.adbase = self.get_ad_api()
self.adbase.run_in(self._init_devices, 5)
def _init_devices(self, kwargs):
# init known devices
self.log(f"Initing known RF {len(self.rf_db.keys())} devices")
for code, device in self.rf_db.items():
# emit device sensor
attrs = {
"code": code,
"device_name": device.get('device_name'),
"description": device.get('description'),
}
attrs.update(device)
self.set_state(
f"sensor.{device['name']}",
state='off',
attributes=attrs,
)
def _find_entity(self, data):
# try to get UPPER and lower codes
self.log(f"Trying to find {data['code'].upper()} in RF DB")
device = self.rf_db.get(data['code'].upper(), self.rf_db.get(data['code'].lower(), None))
if not device:
# device not known
return None
self.log(f"Found {device} continue to match additional parameters")
# check additional data
for key in data.keys():
if key in device:
if str(device[key]).lower() == str(data[key]).lower():
return device
else:
# additional data not matched
return None
self.log(f"Device {device} not match with additional parameters {data}")
# if keys not found return this device only by code
return device
def event_received(self, event_name, data, kwargs):
if (event_name == 'state_changed' and
data['entity_id'] in self.known_sensors and
data['old_state']['state'] != data['new_state']['state']):
# just on state change set back to off
def nested_update(app, **kwargs):
state = self.get_state(data['entity_id'], attribute='attributes')
state['last_change_off'] = self.get_now()
self.set_state(data['entity_id'], state='off', attributes=state)
self.adbase.run_in(nested_update, self.known_sensors[data['entity_id']]['delay'])
return
elif event_name == 'state_changed':
return
self.log(f"RF Bridge Code event received. Event was: {data}")
device = self._find_entity(data)
if device:
entity_id = f"sensor.{device['name']}"
self.log(f"RF Code {data['code']} match device: {device}")
data.update(device)
data.update({
"device_name": device.get('device_name'),
"description": device.get('description'),
"last_change_on": str(self.get_now()),
})
# emit device sensor
self.set_state(
entity_id,
state='on',
attributes=data,
)
self.known_sensors[entity_id] = {'delay': device.get('delay', 1)}
else:
self.log(f"RF Code {data['code']} did't match any device")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment