Created
August 3, 2020 18:30
-
-
Save Cougar/7e10626fa524aa92ffa33dcb6068d927 to your computer and use it in GitHub Desktop.
Python3 MQTT sender example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Create database | |
curl -XPOST http://hostname:8086/query --data-urlencode 'q=CREATE DATABASE "cc128"' | |
""" | |
""" | |
DEBUG:asyncio:Using selector: EpollSelector | |
Socket opened | |
DEBUG:__main__:Sending CONNECT (u1, p1, wr0, wq0, wf0, c1, k60) client_id=b'cc128' | |
misc_loop started | |
Socket is readable, calling loop_read | |
DEBUG:__main__:Received CONNACK (0, 0) | |
INFO:__main__:on_connect: 0 | |
... | |
DEBUG:__main__:Sending PUBLISH .... | |
... | |
Socket is readable, calling loop_read | |
Socket closed | |
ERROR:__main__:on_disconnect: 1 | |
""" | |
port = '/dev/ttyUSB1' | |
mqtt_host = '127.0.0.1' | |
mqtt_port = 1883 | |
mqtt_username = 'xxx' | |
mqtt_password = '12345' | |
mqtt_topic = 'test/energy/cc128' | |
import tornado.ioloop | |
import socket | |
import serial | |
import sys | |
import time | |
import xmltodict | |
import json | |
import datetime | |
import logging | |
try: | |
import chromalog | |
chromalog.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] [%(name)s] [%(module)s] [%(funcName)s:%(lineno)d] %(message)s') | |
except Exception as ex: | |
print("chromalog import error: " + str(ex)) | |
pass | |
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG, format='%(asctime)s [%(module)14s] [%(levelname)7s] %(message)s') | |
log = logging.getLogger(__name__) | |
cc = serial.Serial(port, 57600, timeout=10) | |
import paho.mqtt.client as mqtt | |
class MQTTSender(): | |
def __init__(self, ioloop, client_id, host, port, username, password, topic): | |
self.log = logging.getLogger('MQTTSender') | |
self._ioloop = ioloop | |
self._host = host | |
self._port = port | |
self._topic = topic | |
self._connected = False | |
self._mqttc = mqtt.Client(client_id) | |
self._mqttc.enable_logger(self.log) | |
self._mqttc.on_log = self._on_log | |
self._mqttc.on_connect = self._on_connect | |
self._mqttc.on_disconnect = self._on_disconnect | |
# self._mqttc.on_publish = self._on_publish | |
self._mqttc.username_pw_set(username=username, password=password) | |
self._mqttc.connect(host=self._host, port=self._port) | |
self._mqttc.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) | |
self._ioloop.add_handler(self._mqttc.socket(), self._mqttc.loop_read, tornado.ioloop.IOLoop.READ) | |
self._ioloop.add_handler(self._mqttc.socket(), self._mqttc.loop_write, tornado.ioloop.IOLoop.WRITE) | |
self._connect() | |
# self._loop_misc() | |
def _loop_misc(self): | |
self.log.debug('call loop_misc()') | |
self._mqttc.loop_misc() | |
self._ioloop.add_timeout(datetime.timedelta(seconds=10), self._loop_misc) | |
def _connect(self): | |
try: | |
self._mqttc.connect(host=self._host, port=self._port) | |
except Exception as ex: | |
self.log.error("MQTT connection error: %s", str(ex)) | |
self._mqttc.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) | |
def _on_connect(self, mqttc, obj, flags, rc): | |
self.log.info("on_connect: %s", rc) | |
self._connected = True | |
def _on_disconnect(self, mqttc, userdata, rc): | |
self.log.error("on_disconnect: %s", rc) | |
self._connected = False | |
def _on_publish(self, mqttc, obj, mid): | |
self.log.info("on_publish: %s", mid) | |
def publish(self, topic, msg): | |
if not self._connected and False: | |
self.log.warning("MQTT not connected") | |
else: | |
try: | |
self._mqttc.publish('{}/{}'.format(self._topic, topic), msg, qos=0) | |
except Exception as ex: | |
self.log.error("MQTT publish error: %s", str(ex)) | |
def _on_log(self, userdata, level, buf): | |
self.log.debug("_on_log(%s, %s, %s)".format(str(userdata), str(level), buf)) | |
if level == mqtt.MQTT_LOG_DEBUG: | |
self.log.debug(buf) | |
else: | |
self.log.error(buf) | |
class Cc128Serial(): | |
def __init__(self, ioloop, publish_cb, port, baud=57600): | |
self.log = logging.getLogger('Cc128Serial') | |
self._publish_cb = publish_cb | |
self._serial = serial.Serial(port, 57600, timeout=10) | |
self._buf = '' | |
ioloop.add_handler(self._serial, self._reader, tornado.ioloop.IOLoop.READ) | |
def _reader(self): | |
msg = self._serial.read().decode() | |
if msg == '\n': | |
self._process() | |
self._buf = '' | |
else: | |
self._buf += msg | |
def _process(self): | |
self.log.info("Cc128Serial._process: %s", self._buf) | |
try: | |
x = xmltodict.parse(self._buf) | |
# self.log.debug("xmldict: %s", str(x)) | |
except Exception as ex: | |
self.log.error("xml parse error: %s", ex) | |
return | |
try: | |
x | |
except: | |
self.log.error("xml output missing") | |
return | |
# self.log.debug("JSON: %s", json.dumps(x)) #, indent=2)) | |
if not 'msg' in x: | |
self.log.error("msg is missing") | |
return | |
self._publish_cb('msg', json.dumps(x['msg'])) | |
if not 'tmpr' in x['msg']: | |
self.log.debug('no tmpr data') | |
return | |
tmpr = x['msg']['tmpr'] | |
try: | |
sensor_id = int(x['msg']['id']) | |
except Exception as ex: | |
self.log.error("can't read id: %s", str(ex)) | |
return | |
if (sensor_id == 3185): | |
try: | |
ch1 = int(x['msg']['ch1']['watts']) | |
ch2 = int(x['msg']['ch2']['watts']) | |
ch3 = int(x['msg']['ch3']['watts']) | |
except Exception as ex: | |
self.log.error("int conversion error: %s", str(ex)) | |
return | |
tot = ch1 + ch2 + ch3 | |
self._publish_cb('{}/ch1/watts'.format(sensor_id), ch1) | |
self._publish_cb('{}/ch2/watts'.format(sensor_id), ch2) | |
self._publish_cb('{}/ch3/watts'.format(sensor_id), ch3) | |
self._publish_cb('{}/total/watts'.format(sensor_id), tot) | |
elif (sensor_id == 3307): | |
try: | |
ch1 = int(x['msg']['ch1']['watts']) | |
except Exception as ex: | |
self.log.error("int conversion error: %s", str(ex)) | |
return | |
tot = ch1 | |
else: | |
self.log.error("unknown sensor id: %d", sensor_id) | |
ioloop = tornado.ioloop.IOLoop.instance() | |
mqttc = MQTTSender(ioloop, "test-cc128", mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic) | |
Cc128Serial(ioloop, mqttc.publish, port, 57600) | |
try: | |
ioloop.start() | |
except KeyboardInterrupt: | |
log.info('KeyboardInterrupt') | |
pass | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment