Created
January 18, 2021 14:48
-
-
Save zobayer1/4ddba85a59b3c802da91193a81cf23f7 to your computer and use it in GitHub Desktop.
Wrapper class for Python Paho MQTT client with callbacks and auto reconnect.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import click | |
from paho.mqtt.client import Client | |
class MQTTClient(object): | |
"""Manages Paho MQTT client lifecycle and callbacks""" | |
def __init__(self, config: dict, message_processor=None): | |
self.config = config | |
self.client = Client( | |
client_id=config.mqtt_client, | |
clean_session=config.mqtt_clean_session, | |
userdata={"client": config.mqtt_client}, | |
) | |
self.client.username_pw_set(config.mqtt_username, config.mqtt_password) | |
if self.config.mqtt_debug: | |
self.client.on_log = self._on_log | |
self.client.on_connect = self._on_connect | |
self.client.on_subscribe = self._on_subscribe | |
self.client.on_message = self._on_message | |
self.client.on_publish = self._on_publish | |
self.client.on_disconnect = self._on_disconnect | |
self.client.connect(config.mqtt_host, config.mqtt_port, 60) | |
if message_processor: | |
self.message_processor = message_processor | |
def _on_log(self, client, userdata, level, buf): | |
click.echo(f"{buf}, origin: {userdata['client']}") | |
def _on_connect(self, client, userdata, flags, rc): | |
click.echo( | |
f"Connected {userdata['client']}, result code: {str(rc)} {str(flags)}" | |
) | |
click.echo(f"Subscribing to all topics...") | |
self.client.subscribe(self.config.mqtt_topics) | |
def _on_subscribe(self, client, userdata, mid, granted_qos): | |
click.echo( | |
f"Subscribed {userdata['client']}, mid: {mid}, granted qos: {granted_qos}" | |
) | |
click.echo(f"Listening for {userdata['client']} messages...") | |
def _on_disconnect(self, client, userdata, rc): | |
click.echo(f"Disconnected {userdata['client']}, result code: {str(rc)}") | |
def _on_message(self, client, userdata, msg): | |
if hasattr(self, "message_processor"): | |
self.message_processor(client, userdata, msg) | |
else: | |
click.echo( | |
f"Topic: {msg.topic}, Mid: {msg.mid}, Payload: {msg.payload.decode('utf-8')}" | |
) | |
def _on_publish(self, client, userdata, mid): | |
click.echo(f"Published by {userdata['client']}, mid: {mid}") | |
def listen(self): | |
try: | |
self.client.loop_forever() | |
except KeyboardInterrupt: | |
click.echo( | |
f"Received KeyboardInterrupt, disconnecting {self.config.mqtt_client}" | |
) | |
self.client.disconnect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment