Last active
March 17, 2023 18:07
-
-
Save justind000/2055b02bdfd203abe425398fa2564bf1 to your computer and use it in GitHub Desktop.
Helium - InfluxDB Bridge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ssl, sys, json, base64 | |
import paho.mqtt.client, msgpack, influxdb_client | |
from influxdb_client import InfluxDBClient, Point | |
from influxdb_client.client.write_api import SYNCHRONOUS | |
ufire_server = "" | |
mqtt_port = 8883 | |
mqtt_username = "" | |
mqtt_password = "" | |
influx_bucket = "" | |
influx_token = "==" | |
influx_organization = "" | |
client = InfluxDBClient(url="https://" + ufire_server + ":9999", token=influx_token, org=influx_organization) | |
write_api = client.write_api(write_options=SYNCHRONOUS) | |
def on_connect(client, userdata, flags, rc): | |
# when we connect, also subscribe to a topic | |
print('MQTT connected') | |
client.subscribe(topic='helium/#', qos=2) | |
def on_message(client, userdata, message): | |
print('------------------------------') | |
print('MQTT topic: %s' % message.topic) | |
# parse out the payload | |
j = json.loads(message.payload) | |
# decode the payload from base64 | |
b64 = base64.b64decode(j['payload']) | |
# now decode from MsgPack to a python dict | |
mp_dict = msgpack.unpackb(b64) | |
# add a field to our line with the device EUI | |
# and another for device name | |
field = influx_bucket + ",eui=" + j['app_eui'] + ',name=' + j['name'] | |
# add a few extra values | |
mp_dict.update({'rssi': str(j['hotspots'][0]['rssi'])}) | |
mp_dict.update({'snr': str(j['hotspots'][0]['snr'])}) | |
# now loop through all the key values from our MsgPack payload | |
keys = "" | |
key_i = 0; | |
for key, value in mp_dict.items(): | |
if key_i == 0: | |
keys = keys + key + "=" + str(value) | |
else: | |
keys = keys + "," + key + "=" + str(value) | |
key_i = key_i + 1 | |
# print it so we can see | |
print(field + " " + keys) | |
# and send it to the server | |
write_api.write(bucket=influx_bucket, record=field + " " + keys) | |
def main(): | |
# setup the MQTT client | |
client = paho.mqtt.client.Client(client_id='', clean_session=True) | |
client.username_pw_set(mqtt_username, mqtt_password) | |
client.on_connect = on_connect | |
client.on_message = on_message | |
client.tls_set('/etc/ssl/certs/DST_Root_CA_X3.pem', tls_version=ssl.PROTOCOL_TLSv1_2) | |
client.connect(host=ufire_server, port=mqtt_port) | |
client.loop_forever() | |
if __name__ == '__main__': | |
main() | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment