Last active
March 2, 2022 09:32
-
-
Save bonzini/b917e978959c08ebe6898aea12ec569b to your computer and use it in GitHub Desktop.
termostato con zigbee2mqtt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
from collections import defaultdict | |
import asyncio | |
import paho.mqtt.client | |
import json | |
import weakref | |
class ActiveSubscription: | |
def __init__(self, topic, obj, client): | |
# when obj dies, all references between the client and | |
# obj's corresponding ActiveSubscription go away | |
self.topic = topic | |
self.client = client | |
self.wr = weakref.ref(obj, self.unsubscribe) | |
def unsubscribe(self, wr): | |
if self.wr is None: | |
return | |
self.wr = None | |
self.client.do_unsubscribe(self.topic, wr) | |
class MqttClient: | |
def __init__(self, host, port=1883, keepalive=60, bind_address="", | |
username=None, password=None, | |
client_id="", transport="tcp"): | |
self.loop = asyncio.get_event_loop() | |
self.client_id = client_id or 'mqtt' | |
self.subscribers = defaultdict(set) | |
self.subscriptions = dict() | |
self.client = paho.mqtt.client.Client(client_id, transport=transport) | |
self.client.on_message = lambda client, userdata, message: \ | |
self.loop.call_soon_threadsafe(self.on_message, message) | |
if username: | |
self.client.username_pw_set(username, password) | |
self.client.connect(host, port, keepalive, bind_address) | |
self.client.loop_start() | |
def on_message(self, message): | |
#print(f'[{self.client_id}]', message.topic, message.payload) | |
for wr in list(self.subscribers[message.topic]): | |
obj = wr() | |
if obj is not None: | |
obj.on_message(message) | |
def will_set(self, topic, payload=None, qos=0, retain=False): | |
self.client.will_set(topic, payload, qos, retain) | |
def publish(self, topic, payload=None, qos=0, retain=False): | |
self.client.publish(topic, payload, qos, retain) | |
def is_subscribed(self, obj): | |
return weakref.ref(obj) in self.subscriptions | |
def subscribe(self, obj, topic): | |
if self.is_subscribed(obj): | |
raise Exception("two subscriptions for the same object") | |
need_subscribe = not self.subscribers[topic] | |
proxy = ActiveSubscription(topic, obj, self) | |
self.subscribers[topic].add(proxy.wr) | |
self.subscriptions[proxy.wr] = proxy | |
if need_subscribe: | |
self.client.subscribe(topic) | |
print(f'[{self.client_id}] subscribe', topic) | |
def do_unsubscribe(self, topic, wr): | |
self.subscribers[topic].remove(wr) | |
del self.subscriptions[wr] | |
if not self.subscribers[topic]: | |
self.client.unsubscribe(topic) | |
print(f'[{self.client_id}] unsubscribe', topic) | |
def unsubscribe(self, obj): | |
wr = weakref.ref(obj) | |
proxy = self.subscriptions[wr] | |
proxy.unsubscribe(wr) | |
class Topic: | |
def __init__(self, client, topic, json=False): | |
self.loop = asyncio.get_event_loop() | |
self.client = client | |
self.topic = topic | |
self.future = self.loop.create_future() | |
self.json = json | |
self.client.subscribe(self, topic) | |
def on_message(self, msg): | |
value = msg.payload.decode() | |
self.value = json.loads(value) if self.json else value | |
if not self.future.done(): | |
self.future.set_result(True) | |
async def get(self): | |
await self.future | |
return self.value | |
def close(self): | |
self.client.unsubscribe(self) | |
class Subscription: | |
def __init__(self, client, topic, queue, json=False): | |
self.client = client | |
self.queue = queue | |
self.json = json | |
self.client.subscribe(self, topic) | |
def on_message(self, msg): | |
value = msg.payload.decode() | |
value = json.loads(value) if self.json else value | |
self.queue.put_nowait((msg.topic, value)) | |
def close(self): | |
self.client.unsubscribe(self) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import argparse | |
import asyncio | |
import atexit | |
import contextlib | |
import time | |
from mqtt import MqttClient, Topic, Subscription | |
from utils import repeat, run_then_cancel | |
SECONDI_COPIA = 3600 | |
SECONDI_CONTROLLO = 60 | |
class Termostato: | |
def __init__(self, client): | |
self.modo = Topic(client, 'controllo-temperatura/modo') | |
self.t1 = Topic(client, 'controllo-temperatura/t1') | |
self.t2 = Topic(client, 'controllo-temperatura/t2') | |
async def temp_obiettivo(self): | |
ora = time.localtime().tm_hour | |
modo = await self.modo.get() | |
if modo == 'spento': | |
return 6 | |
if modo == 'basso' or (modo == 'acceso' and (ora < 6 or ora > 20)): | |
return float(await self.t1.get()) | |
else: | |
return float(await self.t2.get()) | |
class Controllo: | |
def __init__(self, client, controllo, in_base, out_base): | |
self.client = client | |
self.controllo = controllo | |
self.obiettivo = None | |
self.tempo = 100 | |
self.stato_desiderato = None | |
self.sensore = Topic(client, in_base, json=True) | |
self.presa = Topic(client, out_base, json=True) | |
self.out_topic = out_base + '/set/state' | |
async def una_volta(self): | |
self.tempo += 1 | |
stato = (await self.presa.get())['state'] | |
sensore = (await self.sensore.get())['temperature'] | |
obiettivo = await self.controllo.temp_obiettivo() | |
if stato != self.stato_desiderato or obiettivo != self.obiettivo or self.tempo >= 10: | |
self.obiettivo = obiettivo | |
if stato == 'ON': | |
self.stato_desiderato = 'OFF' if sensore > obiettivo + 0.5 else 'ON' | |
elif stato == 'OFF': | |
self.stato_desiderato = 'ON' if sensore < obiettivo - 0.5 else 'OFF' | |
print(self.out_topic, self.tempo, sensore, stato, obiettivo, self.stato_desiderato) | |
if self.stato_desiderato != stato: | |
self.client.publish(self.out_topic, self.stato_desiderato) | |
self.tempo = 0 | |
class CopiaPeriodica: | |
def __init__(self, in_client, out_client, prefix, topics): | |
self.out_client = out_client | |
self.prefix = prefix | |
self.topics = [Topic(in_client, x) for x in topics] | |
async def una_volta(self): | |
for t in self.topics: | |
payload = await t.get() | |
self.out_client.publish(t.topic, await t.get(), retain=True) | |
#print(self.prefix, t.topic, payload) | |
class Copia: | |
def __init__(self, in_client, out_client, prefix, topics): | |
self.queue = asyncio.Queue() | |
self.out_client = out_client | |
self.prefix = prefix | |
self.subscriptions = [Subscription(in_client, x, self.queue) for x in topics] | |
async def vai(self): | |
with contextlib.suppress(asyncio.CancelledError): | |
while True: | |
topic, payload = await self.queue.get() | |
self.out_client.publish(topic, payload, retain=True) | |
#print(self.prefix, topic, payload) | |
def leggi(esterno, locale): | |
leggi_esterno = Copia(esterno, locale, '<<', ['controllo-temperatura/modo', | |
'controllo-temperatura/t1', | |
'controllo-temperatura/t2']) | |
return leggi_esterno.vai() | |
def pubblica(locale, esterno): | |
scrivi_esterno = CopiaPeriodica(locale, esterno, '>>', ['zigbee2mqtt/temp1', 'zigbee2mqtt/presa1', | |
'zigbee2mqtt/temp2', 'zigbee2mqtt/presa2']) | |
return repeat(scrivi_esterno.una_volta, SECONDI_COPIA) | |
def controllo(locale, in_base, out_base): | |
termostato = Termostato(locale) | |
controllo = Controllo(locale, termostato, in_base, out_base) | |
return repeat(controllo.una_volta, SECONDI_CONTROLLO) | |
def main(loop): | |
parser = argparse.ArgumentParser(description='termostato') | |
parser.add_argument('-u', '--username', metavar='USER', help='username') | |
parser.add_argument('server', metavar='HOST', nargs='?', help='host') | |
args = parser.parse_args() | |
locale = MqttClient('localhost', client_id='locale') | |
loop.create_task(controllo(locale, 'zigbee2mqtt/temp1', 'zigbee2mqtt/presa1'), name='locale1') | |
loop.create_task(controllo(locale, 'zigbee2mqtt/temp2', 'zigbee2mqtt/presa2'), name='locale2') | |
if args.server: | |
esterno = MqttClient(args.server, client_id='esterno', username=args.username, keepalive=60) | |
esterno.will_set('controllo-temperatura/attivo', '0', retain=True) | |
esterno.publish('controllo-temperatura/attivo', '1', retain=True) | |
atexit.register(esterno.publish, 'controllo-temperatura/attivo', '0', retain=True) | |
loop.create_task(leggi(esterno, locale), name='leggi') | |
loop.create_task(pubblica(locale, esterno), name='pubblica') | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
main(loop) | |
run_then_cancel(loop) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import asyncio | |
import contextlib | |
import signal | |
async def repeat(func, interval): | |
with contextlib.suppress(asyncio.CancelledError): | |
while True: | |
loop = asyncio.get_running_loop() | |
timer = loop.create_future() | |
h = loop.call_later(interval, timer.set_result, True) | |
try: | |
await func() | |
await timer | |
except Exception as e: | |
h.cancel() | |
if not timer.done(): | |
timer.cancel() | |
raise e | |
h.cancel() | |
def run_then_cancel(loop): | |
loop.add_signal_handler(signal.SIGTERM, loop.stop) | |
loop.add_signal_handler(signal.SIGINT, loop.stop) | |
try: | |
loop.run_forever() | |
finally: | |
for t in asyncio.all_tasks(loop=loop): | |
if not (t.done() or t.cancelled()): | |
t.cancel() | |
loop.run_until_complete(t) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment