Skip to content

Instantly share code, notes, and snippets.

@Cougar
Created August 3, 2020 18:30
Show Gist options
  • Save Cougar/7e10626fa524aa92ffa33dcb6068d927 to your computer and use it in GitHub Desktop.
Save Cougar/7e10626fa524aa92ffa33dcb6068d927 to your computer and use it in GitHub Desktop.
Python3 MQTT sender example
#!/usr/bin/python3
"""
Create database
curl -XPOST http://hostname:8086/query --data-urlencode 'q=CREATE DATABASE "cc128"'
"""
"""
DEBUG:asyncio:Using selector: EpollSelector
Socket opened
DEBUG:__main__:Sending CONNECT (u1, p1, wr0, wq0, wf0, c1, k60) client_id=b'cc128'
misc_loop started
Socket is readable, calling loop_read
DEBUG:__main__:Received CONNACK (0, 0)
INFO:__main__:on_connect: 0
...
DEBUG:__main__:Sending PUBLISH ....
...
Socket is readable, calling loop_read
Socket closed
ERROR:__main__:on_disconnect: 1
"""
port = '/dev/ttyUSB1'
mqtt_host = '127.0.0.1'
mqtt_port = 1883
mqtt_username = 'xxx'
mqtt_password = '12345'
mqtt_topic = 'test/energy/cc128'
import tornado.ioloop
import socket
import serial
import sys
import time
import xmltodict
import json
import datetime
import logging
try:
import chromalog
chromalog.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] [%(name)s] [%(module)s] [%(funcName)s:%(lineno)d] %(message)s')
except Exception as ex:
print("chromalog import error: " + str(ex))
pass
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG, format='%(asctime)s [%(module)14s] [%(levelname)7s] %(message)s')
log = logging.getLogger(__name__)
cc = serial.Serial(port, 57600, timeout=10)
import paho.mqtt.client as mqtt
class MQTTSender():
def __init__(self, ioloop, client_id, host, port, username, password, topic):
self.log = logging.getLogger('MQTTSender')
self._ioloop = ioloop
self._host = host
self._port = port
self._topic = topic
self._connected = False
self._mqttc = mqtt.Client(client_id)
self._mqttc.enable_logger(self.log)
self._mqttc.on_log = self._on_log
self._mqttc.on_connect = self._on_connect
self._mqttc.on_disconnect = self._on_disconnect
# self._mqttc.on_publish = self._on_publish
self._mqttc.username_pw_set(username=username, password=password)
self._mqttc.connect(host=self._host, port=self._port)
self._mqttc.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
self._ioloop.add_handler(self._mqttc.socket(), self._mqttc.loop_read, tornado.ioloop.IOLoop.READ)
self._ioloop.add_handler(self._mqttc.socket(), self._mqttc.loop_write, tornado.ioloop.IOLoop.WRITE)
self._connect()
# self._loop_misc()
def _loop_misc(self):
self.log.debug('call loop_misc()')
self._mqttc.loop_misc()
self._ioloop.add_timeout(datetime.timedelta(seconds=10), self._loop_misc)
def _connect(self):
try:
self._mqttc.connect(host=self._host, port=self._port)
except Exception as ex:
self.log.error("MQTT connection error: %s", str(ex))
self._mqttc.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
def _on_connect(self, mqttc, obj, flags, rc):
self.log.info("on_connect: %s", rc)
self._connected = True
def _on_disconnect(self, mqttc, userdata, rc):
self.log.error("on_disconnect: %s", rc)
self._connected = False
def _on_publish(self, mqttc, obj, mid):
self.log.info("on_publish: %s", mid)
def publish(self, topic, msg):
if not self._connected and False:
self.log.warning("MQTT not connected")
else:
try:
self._mqttc.publish('{}/{}'.format(self._topic, topic), msg, qos=0)
except Exception as ex:
self.log.error("MQTT publish error: %s", str(ex))
def _on_log(self, userdata, level, buf):
self.log.debug("_on_log(%s, %s, %s)".format(str(userdata), str(level), buf))
if level == mqtt.MQTT_LOG_DEBUG:
self.log.debug(buf)
else:
self.log.error(buf)
class Cc128Serial():
def __init__(self, ioloop, publish_cb, port, baud=57600):
self.log = logging.getLogger('Cc128Serial')
self._publish_cb = publish_cb
self._serial = serial.Serial(port, 57600, timeout=10)
self._buf = ''
ioloop.add_handler(self._serial, self._reader, tornado.ioloop.IOLoop.READ)
def _reader(self):
msg = self._serial.read().decode()
if msg == '\n':
self._process()
self._buf = ''
else:
self._buf += msg
def _process(self):
self.log.info("Cc128Serial._process: %s", self._buf)
try:
x = xmltodict.parse(self._buf)
# self.log.debug("xmldict: %s", str(x))
except Exception as ex:
self.log.error("xml parse error: %s", ex)
return
try:
x
except:
self.log.error("xml output missing")
return
# self.log.debug("JSON: %s", json.dumps(x)) #, indent=2))
if not 'msg' in x:
self.log.error("msg is missing")
return
self._publish_cb('msg', json.dumps(x['msg']))
if not 'tmpr' in x['msg']:
self.log.debug('no tmpr data')
return
tmpr = x['msg']['tmpr']
try:
sensor_id = int(x['msg']['id'])
except Exception as ex:
self.log.error("can't read id: %s", str(ex))
return
if (sensor_id == 3185):
try:
ch1 = int(x['msg']['ch1']['watts'])
ch2 = int(x['msg']['ch2']['watts'])
ch3 = int(x['msg']['ch3']['watts'])
except Exception as ex:
self.log.error("int conversion error: %s", str(ex))
return
tot = ch1 + ch2 + ch3
self._publish_cb('{}/ch1/watts'.format(sensor_id), ch1)
self._publish_cb('{}/ch2/watts'.format(sensor_id), ch2)
self._publish_cb('{}/ch3/watts'.format(sensor_id), ch3)
self._publish_cb('{}/total/watts'.format(sensor_id), tot)
elif (sensor_id == 3307):
try:
ch1 = int(x['msg']['ch1']['watts'])
except Exception as ex:
self.log.error("int conversion error: %s", str(ex))
return
tot = ch1
else:
self.log.error("unknown sensor id: %d", sensor_id)
ioloop = tornado.ioloop.IOLoop.instance()
mqttc = MQTTSender(ioloop, "test-cc128", mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic)
Cc128Serial(ioloop, mqttc.publish, port, 57600)
try:
ioloop.start()
except KeyboardInterrupt:
log.info('KeyboardInterrupt')
pass
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment