Skip to content

Instantly share code, notes, and snippets.

@smartynov
Last active November 24, 2017 15:01
Show Gist options
  • Save smartynov/1da64f4d2e7234c6ada44cb513bd3d0d to your computer and use it in GitHub Desktop.
Save smartynov/1da64f4d2e7234c6ada44cb513bd3d0d to your computer and use it in GitHub Desktop.
Simple python MQTT to Postgresql data collector
#!/usr/bin/env python3
import config
import time
import psycopg2
import paho.mqtt.client as mqtt
sql = "INSERT INTO raw (type, data) VALUES (%s, %s)"
conn = psycopg2.connect(**config.db)
cur = conn.cursor()
seen_topics = {}
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
for topic in config.mqtt['topics']:
client.subscribe(topic)
def on_message(client, userdata, msg):
name = msg.topic.replace('/', '.').lower()
data = msg.payload.decode("utf-8")
print("{0}: {1}".format(name, data))
if not name in seen_topics:
cur.execute(sql, (name, data))
conn.commit()
seen_topics[name] = 1
client = mqtt.Client(config.mqtt['client_id'], False)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(
config.mqtt['auth']['username'], config.mqtt['auth']['password'])
client.connect(config.mqtt['host'])
stop_at = time.time() + config.mqtt['timeout']
while time.time() < stop_at:
client.loop(timeout=1)
cur.close()
client.disconnect()
print("Done ({0} topics seen)".format(len(seen_topics)))
# this is an example config file
# copy it to config.py and fill the values
db = {
'host': '127.0.0.1',
'database': 'stat',
'user': 'stat',
'password': ''
}
mqtt = {
'host': '127.0.0.1',
'client_id': 'collect-mqtt',
'timeout': 35,
'auth': {
'username': '',
'password': ''
},
'topics': [
"stat/+/tele/STATE",
"stat/+/tele/ENERGY"
]
}
DROP TABLE raw;
CREATE TABLE raw (
id bigserial NOT NULL PRIMARY KEY,
time timestamp without time zone NOT NULL DEFAULT (now() at time zone 'utc'),
type text,
data jsonb
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment