|
""" |
|
Viess2Mqtt |
|
Bridge between Viessman Cloud API and Mqtt. |
|
Using 3rd party package PyViCare to talk to Viessman. |
|
Publishes all reading/features to MQTT. |
|
Filter a little bit of unnecessary information |
|
Need Scheduler_Node to get triggered. |
|
|
|
Fetch and publish runs in a seperate threat to unblock the main threated while longer data processing. |
|
|
|
Keeps track of changes and updates MQTT only if changed. |
|
|
|
TODO: |
|
- Allow changing settings |
|
- Consider to publish certain features "retained" for better integration |
|
""" |
|
|
|
import datetime |
|
import http.client |
|
import json |
|
import logging |
|
import os |
|
import queue |
|
import sys |
|
from dataclasses import dataclass, field |
|
import threading |
|
from typing import Callable, Union |
|
from contextlib import suppress |
|
|
|
|
|
import dateutil.parser |
|
import PyViCare.PyViCare |
|
import PyViCare.PyViCareUtils |
|
import requests |
|
import urllib3 |
|
from PyViCare.PyViCare import PyViCare as PyViCareAPI |
|
from mqtt_client import mqtt_client |
|
|
|
sys.path.append('..') |
|
|
|
# pylint: disable=wrong-import-position |
|
from nodes import ActionNode |
|
|
|
# pylint: enable=wrong-import-position |
|
# pylint: disable=line-too-long |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class NoneException(BaseException): |
|
pass |
|
|
|
@dataclass |
|
class ViessDevice(): |
|
name: str |
|
device_config: PyViCare.PyViCare.PyViCareDeviceConfig = field(repr=False, compare=False) |
|
device_type: str |
|
features: list[dict] = field(default_factory=list, repr=False, compare=False) |
|
|
|
|
|
@dataclass |
|
class ViessConf(): |
|
client_id: str = "" |
|
email: str = "" |
|
password: str = "" |
|
wake_interval: int = 2 |
|
|
|
|
|
class Viess2Mqtt(ActionNode): |
|
def __init__(self, publish_callback: Callable, private_callback: Callable=None, name: str=None, cryptokey: str=None): |
|
super().__init__(publish_callback, private_callback=private_callback, name=name, cryptokey=cryptokey) |
|
self.q:queue.Queue = queue.Queue() |
|
self.name: str = name if name else __name__ |
|
Viess2MqttThread(self, daemon=True).start() |
|
|
|
def input(self, msg): |
|
"deleget to thread, to free main loop" |
|
self.q.put(msg) |
|
|
|
class Viess2MqttThread(threading.Thread): |
|
primitive = (int, float, str, bool, type(None), bytearray) |
|
basic_iterable = (dict, list, tuple) |
|
basic_list = (list, tuple) |
|
|
|
def __init__(self, viess2mqtt: Viess2Mqtt, *args, **kwargs) -> None: |
|
super().__init__(*args, **kwargs) |
|
self.viess2mqtt: Viess2Mqtt = viess2mqtt |
|
self.q: queue.Queue = viess2mqtt.q |
|
self.topic_prefix = viess2mqtt.name |
|
self.get_prefix: str = f"{self.topic_prefix}/get/" |
|
self.set_prefix: str = f"{self.topic_prefix}/set/" |
|
self.viess_config = ViessConf(**viess2mqtt.conf) |
|
self.vicare: PyViCareAPI = PyViCareAPI() |
|
try: |
|
self.vicare.initWithCredentials( |
|
self.viess_config.email, |
|
self.viess_config.password, |
|
self.viess_config.client_id, |
|
"token.tmp", |
|
) |
|
except ( |
|
PyViCare.PyViCareUtils.PyViCareInternalServerError, |
|
http.client.RemoteDisconnected, |
|
urllib3.exceptions.ProtocolError, |
|
requests.exceptions.ConnectionError, |
|
) as err: |
|
logging.warning("Issue to communicate the the viessmann API servers, but will continue. %s", err) |
|
self.devices: dict = {} |
|
self.last_published: dict = {} |
|
self.__get_devices() |
|
if logging.getLogger().level == logging.DEBUG: |
|
self.exception_switch = NoneException |
|
logging.warning("Ignoring some Exceptions") |
|
else: |
|
self.exception_switch = Exception |
|
|
|
|
|
def __mqtt_init(self) -> None: |
|
self.viess2mqtt.wake_me_every("refresh", self.viess_config.wake_interval, seconds=True) |
|
|
|
def __get_device(self, dev: PyViCare.PyViCare.PyViCareDeviceConfig) -> ViessDevice: |
|
device_config: PyViCare.PyViCare.PyViCareDeviceConfig = dev |
|
dev_type: str = dev.asAutoDetectDevice().__class__.__name__ |
|
conf = dev.getConfig() |
|
model = dev.getModel() |
|
name = f"{model}_{conf.id}" |
|
|
|
return ViessDevice(name, device_config, dev_type) |
|
|
|
def __get_devices(self) -> None: |
|
for dev in self.vicare.devices: |
|
dev = self.__get_device(dev) |
|
self.devices[dev.name] = dev |
|
|
|
def __refresh_data(self) -> None: |
|
dev_connection_topic: str = "dev_status/connection" |
|
dev_statusmsg_topic: str = "dev_status/msg" |
|
dev_name:str = "NA" |
|
dev_type:str = "NA" |
|
try: |
|
for dev in self.devices.values(): |
|
dev.features = dev.device_config.get_raw_json()['data'] |
|
dev_name = dev.name |
|
dev_type = dev.device_type |
|
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), f"Device {dev_name} fetaures refreshed via API") |
|
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_connection_topic), 1) |
|
except PyViCare.PyViCareUtils.PyViCareInternalServerError as pcse: |
|
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_connection_topic), 0) |
|
if "DEVICE_COMMUNICATION_ERROR" in str(pcse): |
|
msg = f"Connection issue to internal device. Can't do anything. {pcse}" |
|
logging.debug(msg) |
|
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg) |
|
elif "Internal server error" in str(pcse): |
|
msg = f"(Temporary) Connection issue to viessmann cloud. Can't do anything. {pcse}" |
|
logging.debug(msg) |
|
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg) |
|
else: |
|
msg = f"Conneciton issues, try to reauthenticate. But may have different reason. {pcse}" |
|
logging.exception(msg) |
|
self.vicare.oauth_manager.renewToken() |
|
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg) |
|
except ( urllib3.exceptions.ProtocolError, http.client.RemoteDisconnected, requests.exceptions.ConnectionError ) as e_conn: |
|
logging.debug("Lowerlevel donnection issue to viessman cloud: %s", e_conn) |
|
|
|
def __cleanup_feature(self, feature: dict) -> dict: |
|
del feature['gatewayId'] |
|
del feature['apiVersion'] |
|
del feature['deviceId'] |
|
del feature['uri'] |
|
if len(feature.get('properties', "NA")) == 0: del feature['properties'] |
|
if len(feature.get('commands', "NA")) == 0: del feature['commands'] |
|
if len(feature.get('components', "NA")) == 0: del feature['components'] |
|
return feature |
|
|
|
def __skip_feature(self, feature: dict) -> bool: |
|
if feature.get('isEnabled') is False: return True |
|
if len(feature.get('properties', "NA")) == 0 and len(feature.get('commands', "NA")) == 0 and len(feature.get('components', "NA")) == 0: |
|
return True |
|
return False |
|
|
|
def __publish_feature(self, device_prefix: list[str], feature: dict) -> None: |
|
if not feature.get('feature'): |
|
logging.error("Feature not found in %s", feature) |
|
return |
|
if self.__skip_feature(feature): |
|
return |
|
feature_path: list[str] = feature['feature'].split(".") |
|
feature_prefix = os.path.join(self.get_prefix, *device_prefix, *feature_path) |
|
del feature['feature'] |
|
feature = self.__cleanup_feature(feature) |
|
now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) |
|
feature_timestamp: datetime.datetime = dateutil.parser.parse(feature['timestamp']) |
|
feature_last_published: datetime.datetime = self.last_published.get(feature_prefix, datetime.datetime(1, 1, 1, 0, 0, 0, 0, datetime.timezone.utc)) |
|
feature['age'] = (now - feature_last_published).total_seconds() |
|
if feature_last_published <= feature_timestamp: |
|
for key, value in feature.items(): |
|
if self.is_basic_iterable(value): |
|
value = json.dumps(value) |
|
if self.is_primitive(value): |
|
self.viess2mqtt._output(os.path.join(feature_prefix, key), value) |
|
self.last_published[feature_prefix] = now |
|
|
|
def __publish_device(self, device: ViessDevice) -> None: |
|
for feature in device.features: |
|
self.__publish_feature([device.device_type, device.name], feature) |
|
|
|
def __publish_viess2mqtt(self) -> None: |
|
for dev in self.devices.values(): |
|
self.__publish_device(dev) |
|
|
|
def run(self): |
|
while True: |
|
with suppress(queue.Empty): |
|
while True: |
|
msg: mqtt_client.MQTTMessage = self.q.get(timeout=1) |
|
self.input(msg) |
|
|
|
def input(self, msg: mqtt_client.MQTTMessage) -> None: |
|
try: |
|
if msg.topic in [f"{self.viess2mqtt.get_own_topic()}init"]: |
|
self.__mqtt_init() |
|
elif msg.topic.startswith(self.viess2mqtt.scheduler_trigger_topic): |
|
if msg.topic.endswith("refresh"): |
|
self.__refresh_data() |
|
self.__publish_viess2mqtt() |
|
elif msg.topic.startswith(self.set_prefix): |
|
pass |
|
except self.exception_switch as exp: |
|
logging.exception("Issue to process message: %s. Exceptio: %s", msg.topic, exp) |
|
|
|
def is_primitive(self, thing): |
|
return isinstance(thing, self.primitive) |
|
|
|
def is_basic_iterable(self, thing): |
|
return isinstance(thing, self.basic_iterable) |
|
|
|
def is_basic_list(self, thing): |
|
return isinstance(thing, self.basic_list) |