Last active
May 1, 2023 13:03
-
-
Save OctoNezd/af1cc19c4c853b60884052d5d9fcb63e to your computer and use it in GitHub Desktop.
AppDaemon App that creates MQTT AC controlled with OpenBeken IR MQTT device
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import appdaemon.plugins.mqtt.mqttapi as mqtt | |
import ctypes | |
import json | |
import functools | |
def dict_inv(old): | |
new = {} | |
for value, key in old.items(): | |
new[key] = value | |
return new | |
modes = dict(kCoolixCool=0b000, | |
kCoolixDry=0b001, | |
kCoolixAuto=0b010, | |
kCoolixHeat=0b011, | |
kCoolixFan=0b100) | |
modes_inv = dict_inv(modes) | |
fan_states = dict(kCoolixFanMin=0b100, | |
kCoolixFanMed=0b010, | |
kCoolixFanMax=0b001, | |
kCoolixFanAuto=0b101, | |
kCoolixFanAuto0=0b000, | |
kCoolixFanZoneFollow=0b110, | |
kCoolixFanFixed=0b111) | |
fan_states_inv = dict_inv(fan_states) | |
modes_ha = dict(kCoolixCool="cool", | |
kCoolixDry="dry", | |
kCoolixAuto="auto", | |
kCoolixHeat="heat", | |
kCoolixFan="fan_only") | |
modes_ha_inv = dict_inv(modes_ha) | |
fan_states_ha_inv = dict( | |
auto="kCoolixFanAuto0", | |
low="kCoolixFanMin", | |
medium="kCoolixFanMed", | |
high="kCoolixFanMax" | |
) | |
fan_states_ha = dict_inv(fan_states_ha_inv) | |
kCoolixTempMin = 17 # Celsius | |
kCoolixTempMax = 30 # Celsius | |
kCoolixFanTempCode = 0b1110 # Part of Fan Mode. | |
kCoolixSensorTempIgnoreCode = 0b11111 | |
temps = [ | |
0b0000, # 17C | |
0b0001, # 18c | |
0b0011, # 19C | |
0b0010, # 20C | |
0b0110, # 21C | |
0b0111, # 22C | |
0b0101, # 23C | |
0b0100, # 24C | |
0b1100, # 25C | |
0b1101, # 26C | |
0b1001, # 27C | |
0b1000, # 28C | |
0b1010, # 29C | |
0b1011 # 30C | |
] | |
kCoolixOff = 0b101100100111101111100000 # 0xB27BE0 | |
kCoolixSwing = 0b101100100110101111100000 # 0xB26BE0 | |
kCoolixSwingH = 0b101100101111010110100010 # 0xB5F5A2 | |
kCoolixSwingV = 0b101100100000111111100000 # 0xB20FE0 | |
kCoolixSleep = 0b101100101110000000000011 # 0xB2E003 | |
kCoolixTurbo = 0b101101011111010110100010 # 0xB5F5A2 | |
kCoolixLed = 0b101101011111010110100101 # 0xB5F5A5 | |
kCoolixClean = 0b101101011111010110101010 # 0xB5F5AA | |
kCoolixCmdFan = 0b101100101011111111100100 # 0xB2BFE4 | |
class CoolixCommandCt_Bits(ctypes.LittleEndianStructure): | |
_fields_ = [ | |
('unk1', ctypes.c_uint32, 1), | |
('ZoneFollow1', ctypes.c_uint32, 1), | |
('Mode', ctypes.c_uint32, 2), | |
('Temp', ctypes.c_uint32, 4), | |
('SensorTemp', ctypes.c_uint32, 5), | |
('Fan', ctypes.c_uint32, 3), | |
('unk2', ctypes.c_uint32, 3), | |
('ZoneFollow2', ctypes.c_uint32, 1), | |
('fixed', ctypes.c_uint32, 4) | |
] | |
class CoolixCommandCt(ctypes.Union): | |
_fields_ = [ | |
('raw', ctypes.c_uint32), | |
('bits', CoolixCommandCt_Bits) | |
] | |
def generate_coolix_message(state=True, mode='Auto', fan='Auto0', temp=25, zfollow=False): | |
if not state: | |
return kCoolixOff | |
mode = modes['kCoolix' + mode] | |
fan = fan_states['kCoolixFan' + fan] | |
temp = temps[temp - kCoolixTempMin] | |
zfollow = 0x2 | |
res = CoolixCommandCt() | |
res.bits.unk1 = 0x0 | |
res.bits.ZoneFollow1 = int(zfollow) | |
res.bits.Mode = mode | |
res.bits.Temp = temp | |
res.bits.SensorTemp = kCoolixSensorTempIgnoreCode | |
res.bits.Fan = fan | |
res.bits.unk2 = 0x2 | |
res.bits.ZoneFollow2 = int(zfollow) | |
res.bits.fixed = 0xb | |
return res | |
# On, 25C, Mode: Auto, Fan: Auto, Zone Follow: Off, Sensor Temp: Ignore, 0xb21fc8 | |
DEF_ON_STATE = 0b101100100001111111001000 | |
DT = 25 | |
DM = "auto" | |
DF = "auto" | |
UID = "homeComfeeAc" | |
MY_NS = "ad/ac" | |
IR_BLASTER = "homeir" | |
DEVICE_NAME = "Comfee AC" | |
class CoolixAcGeneric(mqtt.Mqtt): | |
def initialize(self): | |
self.cmd = CoolixCommandCt() | |
self.cmd.raw = kCoolixOff | |
self.log("Hello from AppDaemon") | |
self.log("You are now ready to run Apps!") | |
self.update_mqtt_state() | |
self.run_every(self.setup_ha_discovery, "now", 10) | |
self.subscribe() | |
self.blast_ir() | |
def blast_ir(self): | |
self.log("blasting new ir state: %s", hex(self.cmd.raw)) | |
self.mqtt_publish(f"cmnd/{IR_BLASTER}/IRSend", | |
f"COOLIX,24,{hex(self.cmd.raw)},1") | |
def subscribe(self): | |
self.mqtt_subscribe(MY_NS + "/mode") | |
self.mqtt_subscribe(IR_BLASTER + "") | |
self.listen_event(self.create_ir_mw(self.set_ac_state), | |
"MQTT_MESSAGE", topic=MY_NS + "/mode") | |
self.listen_event(self.create_ir_mw(self.set_ac_temp), | |
"MQTT_MESSAGE", topic=MY_NS + "/temp") | |
self.listen_event(self.create_ir_mw(self.set_fan_mode), | |
"MQTT_MESSAGE", topic=MY_NS + "/fan") | |
def create_ir_mw(self, func): | |
@functools.wraps(func) | |
def wrapped(_, change, __): | |
if self.cmd.raw == kCoolixOff: | |
self.cmd.raw = DEF_ON_STATE | |
pl = change["payload"] | |
func(pl) | |
self.blast_ir() | |
return wrapped | |
def set_ac_state(self, new_mode): | |
if new_mode == "off": | |
self.cmd.raw = kCoolixOff | |
return | |
desired_mode = modes_ha_inv[new_mode] | |
self.cmd.bits.Mode = modes[desired_mode] | |
self.log("desired mode: %s", desired_mode) | |
def set_ac_temp(self, new_temp): | |
self.cmd.bits.Temp = temps[int(float(new_temp) - kCoolixTempMin)] | |
def set_fan_mode(self, new_mode): | |
desired_mode = fan_states_ha_inv[new_mode] | |
self.cmd.bits.Fan = fan_states[desired_mode] | |
self.log('desired fan state: %s', desired_mode) | |
def setup_ha_discovery(self, *_): | |
self.mqtt_publish("homeassistant/climate/" + UID + | |
"/config", payload=json.dumps({ | |
"unique_id": UID, | |
"temperature_unit": "C", | |
"temperature_state_topic": MY_NS + "/temp", | |
"temperature_command_topic": MY_NS + "/temp", | |
"temperature_state_template": "{{ value_json }}", | |
"fan_mode_command_topic": MY_NS + "/fan", | |
"fan_mode_state_topic": MY_NS + "/fan", | |
"mode_command_topic": MY_NS + "/mode", | |
"mode_state_topic": MY_NS + "/mode", | |
"optimistic": True, | |
"precision": 1, | |
"device": { | |
"identifiers": UID | |
}, | |
"max_temp": kCoolixTempMax, | |
"min_temp": kCoolixTempMin, | |
"name": DEVICE_NAME, | |
"initial": DT | |
})) | |
self.log("Published myself to homeassistant discovery") | |
def update_mqtt_state(self): | |
if self.cmd.raw == kCoolixOff: | |
self.mqtt_publish(topic=MY_NS + "/mode", payload="off") | |
self.mqtt_publish(topic=MY_NS + "/temp", payload=DT) | |
self.mqtt_publish(topic=MY_NS + "/fan", payload=DF) | |
return | |
self.mqtt_publish( | |
topic=MY_NS + "/temp", payload=temps.index(self.cmd.bits.Temp) + kCoolixTempMin) | |
self.mqtt_publish( | |
topic=MY_NS + "/mode", payload=modes_ha[modes_inv[self.cmd.bits.Mode]]) | |
self.mqtt_publish( | |
topic=MY_NS + "/fan", payload=fan_states_ha.get(fan_states_inv[self.cmd.bits.Fan], "auto")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment