""" |
Viess2Mqtt |
Bridge between Viessman Cloud API and Mqtt. |
Using 3rd party package PyViCare to talk to Viessman. |
Publishes all reading/features to MQTT. |
Filter a little bit of unnecessary information |
Need Scheduler_Node to get triggered. |
Fetch and publish runs in a seperate threat to unblock the main threated while longer data processing. |
Keeps track of changes and updates MQTT only if changed. |
- Allow changing settings |
- Consider to publish certain features "retained" for better integration |
""" |
import datetime |
import http.client |
import json |
import logging |
import os |
import queue |
import sys |
from dataclasses import dataclass, field |
import threading |
from typing import Callable, Union |
from contextlib import suppress |
import dateutil.parser |
import PyViCare.PyViCare |
import PyViCare.PyViCareUtils |
import requests |
import urllib3 |
from PyViCare.PyViCare import PyViCare as PyViCareAPI |
from mqtt_client import mqtt_client |
sys.path.append('..') |
# pylint: disable=wrong-import-position |
from nodes import ActionNode |
# pylint: enable=wrong-import-position |
# pylint: disable=line-too-long |
logger = logging.getLogger(__name__) |
class NoneException(BaseException): |
pass |
@dataclass |
class ViessDevice(): |
name: str |
device_config: PyViCare.PyViCare.PyViCareDeviceConfig = field(repr=False, compare=False) |
device_type: str |
features: list[dict] = field(default_factory=list, repr=False, compare=False) |
@dataclass |
class ViessConf(): |
client_id: str = "" |
email: str = "" |
password: str = "" |
wake_interval: int = 2 |
class Viess2Mqtt(ActionNode): |
def __init__(self, publish_callback: Callable, private_callback: Callable=None, name: str=None, cryptokey: str=None): |
super().__init__(publish_callback, private_callback=private_callback, name=name, cryptokey=cryptokey) |
self.q:queue.Queue = queue.Queue() |
self.name: str = name if name else __name__ |
Viess2MqttThread(self, daemon=True).start() |
def input(self, msg): |
"deleget to thread, to free main loop" |
self.q.put(msg) |
class Viess2MqttThread(threading.Thread): |
primitive = (int, float, str, bool, type(None), bytearray) |
basic_iterable = (dict, list, tuple) |
basic_list = (list, tuple) |
def __init__(self, viess2mqtt: Viess2Mqtt, *args, **kwargs) -> None: |
super().__init__(*args, **kwargs) |
self.viess2mqtt: Viess2Mqtt = viess2mqtt |
self.q: queue.Queue = viess2mqtt.q |
self.topic_prefix = viess2mqtt.name |
self.get_prefix: str = f"{self.topic_prefix}/get/" |
self.set_prefix: str = f"{self.topic_prefix}/set/" |
self.viess_config = ViessConf(**viess2mqtt.conf) |
self.vicare: PyViCareAPI = PyViCareAPI() |
try: |
self.vicare.initWithCredentials( |
self.viess_config.email, |
self.viess_config.password, |
self.viess_config.client_id, |
"token.tmp", |
) |
except ( |
PyViCare.PyViCareUtils.PyViCareInternalServerError, |
http.client.RemoteDisconnected, |
urllib3.exceptions.ProtocolError, |
requests.exceptions.ConnectionError, |
) as err: |
logging.warning("Issue to communicate the the viessmann API servers, but will continue. %s", err) |
self.devices: dict = {} |
self.last_published: dict = {} |
self.__get_devices() |
if logging.getLogger().level == logging.DEBUG: |
self.exception_switch = NoneException |
logging.warning("Ignoring some Exceptions") |
else: |
self.exception_switch = Exception |
def __mqtt_init(self) -> None: |
self.viess2mqtt.wake_me_every("refresh", self.viess_config.wake_interval, seconds=True) |
def __get_device(self, dev: PyViCare.PyViCare.PyViCareDeviceConfig) -> ViessDevice: |
device_config: PyViCare.PyViCare.PyViCareDeviceConfig = dev |
dev_type: str = dev.asAutoDetectDevice().__class__.__name__ |
conf = dev.getConfig() |
model = dev.getModel() |
name = f"{model}_{conf.id}" |
return ViessDevice(name, device_config, dev_type) |
def __get_devices(self) -> None: |
for dev in self.vicare.devices: |
dev = self.__get_device(dev) |
self.devices[dev.name] = dev |
def __refresh_data(self) -> None: |
dev_connection_topic: str = "dev_status/connection" |
dev_statusmsg_topic: str = "dev_status/msg" |
dev_name:str = "NA" |
dev_type:str = "NA" |
try: |
for dev in self.devices.values(): |
dev.features = dev.device_config.get_raw_json()['data'] |
dev_name = dev.name |
dev_type = dev.device_type |
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), f"Device {dev_name} fetaures refreshed via API") |
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_connection_topic), 1) |
except PyViCare.PyViCareUtils.PyViCareInternalServerError as pcse: |
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_connection_topic), 0) |
msg = f"Connection issue to internal device. Can't do anything. {pcse}" |
logging.debug(msg) |
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg) |
elif "Internal server error" in str(pcse): |
msg = f"(Temporary) Connection issue to viessmann cloud. Can't do anything. {pcse}" |
logging.debug(msg) |
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg) |
else: |
msg = f"Conneciton issues, try to reauthenticate. But may have different reason. {pcse}" |
logging.exception(msg) |
self.vicare.oauth_manager.renewToken() |
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg) |
except ( urllib3.exceptions.ProtocolError, http.client.RemoteDisconnected, requests.exceptions.ConnectionError ) as e_conn: |
logging.debug("Lowerlevel donnection issue to viessman cloud: %s", e_conn) |
def __cleanup_feature(self, feature: dict) -> dict: |
del feature['gatewayId'] |
del feature['apiVersion'] |
del feature['deviceId'] |
del feature['uri'] |
if len(feature.get('properties', "NA")) == 0: del feature['properties'] |
if len(feature.get('commands', "NA")) == 0: del feature['commands'] |
if len(feature.get('components', "NA")) == 0: del feature['components'] |
return feature |
def __skip_feature(self, feature: dict) -> bool: |
if feature.get('isEnabled') is False: return True |
if len(feature.get('properties', "NA")) == 0 and len(feature.get('commands', "NA")) == 0 and len(feature.get('components', "NA")) == 0: |
return True |
return False |
def __publish_feature(self, device_prefix: list[str], feature: dict) -> None: |
if not feature.get('feature'): |
logging.error("Feature not found in %s", feature) |
return |
if self.__skip_feature(feature): |
return |
feature_path: list[str] = feature['feature'].split(".") |
feature_prefix = os.path.join(self.get_prefix, *device_prefix, *feature_path) |
del feature['feature'] |
feature = self.__cleanup_feature(feature) |
now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) |
feature_timestamp: datetime.datetime = dateutil.parser.parse(feature['timestamp']) |
feature_last_published: datetime.datetime = self.last_published.get(feature_prefix, datetime.datetime(1, 1, 1, 0, 0, 0, 0, datetime.timezone.utc)) |
feature['age'] = (now - feature_last_published).total_seconds() |
if feature_last_published <= feature_timestamp: |
for key, value in feature.items(): |
if self.is_basic_iterable(value): |
value = json.dumps(value) |
if self.is_primitive(value): |
self.viess2mqtt._output(os.path.join(feature_prefix, key), value) |
self.last_published[feature_prefix] = now |
def __publish_device(self, device: ViessDevice) -> None: |
for feature in device.features: |
self.__publish_feature([device.device_type, device.name], feature) |
def __publish_viess2mqtt(self) -> None: |
for dev in self.devices.values(): |
self.__publish_device(dev) |
def run(self): |
while True: |
with suppress(queue.Empty): |
while True: |
msg: mqtt_client.MQTTMessage = self.q.get(timeout=1) |
self.input(msg) |
def input(self, msg: mqtt_client.MQTTMessage) -> None: |
try: |
if msg.topic in [f"{self.viess2mqtt.get_own_topic()}init"]: |
self.__mqtt_init() |
elif msg.topic.startswith(self.viess2mqtt.scheduler_trigger_topic): |
if msg.topic.endswith("refresh"): |
self.__refresh_data() |
self.__publish_viess2mqtt() |
elif msg.topic.startswith(self.set_prefix): |
pass |
except self.exception_switch as exp: |
logging.exception("Issue to process message: %s. Exceptio: %s", msg.topic, exp) |
def is_primitive(self, thing): |
return isinstance(thing, self.primitive) |
def is_basic_iterable(self, thing): |
return isinstance(thing, self.basic_iterable) |
def is_basic_list(self, thing): |
return isinstance(thing, self.basic_list) |