Skip to content

Instantly share code, notes, and snippets.

@ties
Last active November 25, 2017 17:00
Show Gist options
  • Save ties/6f6b1c7267ca240e75d91d1351e27748 to your computer and use it in GitHub Desktop.
Save ties/6f6b1c7267ca240e75d91d1351e27748 to your computer and use it in GitHub Desktop.
# /etc/telegraf/telegraf.d/mqtt_consumer.conf
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
persistent_session = false
client_id = ""
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
"""
Transform MHZ19 data read by easpeasy (output into PiDome MQTT conventions) and
push this onto a topic as supported by Telgraf.
"""
from collections import defaultdict
import logging
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
import os
import asyncio
import time
import json
measurements = defaultdict(dict)
async def bridge_coro():
C = MQTTClient()
host = os.environ['MQTT_HOST']
await C.connect(f'mqtt://{host}:1883/')
await C.subscribe([
('/hooks/devices/#', QOS_0),
])
while True:
try:
message = await C.deliver_message()
packet = message.publish_packet
tokens = packet.variable_header.topic_name.split('/')
device_id = tokens[3]
measurement = tokens[5]
payload = float(packet.payload.data.decode('ascii'))
measurements[device_id][measurement] = payload
if measurement == 'PPM':
body = json.dumps(measurements[device_id]).encode('ascii')
await C.publish('sensors/mhz19_{}'.format(device_id), body)
except Exception as e:
logging.exception()
raise e
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(bridge_coro())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment