Skip to content

Instantly share code, notes, and snippets.

@thinkJD
Created August 14, 2017 14:29
Show Gist options
  • Save thinkJD/b6297798a81d51341c542d70767c6e00 to your computer and use it in GitHub Desktop.
Save thinkJD/b6297798a81d51341c542d70767c6e00 to your computer and use it in GitHub Desktop.
Readout Thunderboard Sense and write the values to influxDB
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
from bluepy.btle import Peripheral
import time
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
# InfluxDB connections settings
host = ''
port = ''
user = ''
password = ''
dbname = ''
myclient = InfluxDBClient(host, port, user, password, dbname)
# bluetooth connection
devices = [('DEVICE_NAME','MAC')]
class SeriesHelper(SeriesHelper):
# Meta class stores time series helper configuration.
class Meta:
client = myclient
series_name = 'environment'
fields = ['temp', 'pressure', 'uv', 'light', 'sound', 'humidity', 'co2', 'voc']
tags = ['location']
bulk_size = 5
# autocommit must be set to True when using bulk_size
autocommit = True
def debug_print(location, data):
print ('### {} ###'.format(location))
for key, value in data.items():
print ('{}:\t{}'.format(key,value))
print('')
def push_to_influx(data, location):
SeriesHelper(location=location, temp=data['temperature'],
pressure=data['pressure'], uv=data['uv'],
light=data['light'], sound=data['sound'],
humidity=data['humidity'], co2=data['co2'], voc=data['voc'])
SeriesHelper.commit()
def read_sensor_data(p):
sen_data = {}
for service in p.getServices():
# environmental service
if '181a' in str(service.uuid):
for ch in service.getCharacteristics():
if '2a76' in str(ch.uuid):
sen_data['uv'] = int.from_bytes(ch.read(), byteorder='little')
if '2a6d' in str(ch.uuid):
sen_data['pressure'] = int.from_bytes(ch.read(), byteorder='little') / 1000
if '2a6e' in str(ch.uuid):
sen_data['temperature'] = int.from_bytes(ch.read(), byteorder='little') / 100
if '2a6f' in str(ch.uuid):
sen_data['humidity'] = int.from_bytes(ch.read(), byteorder='little') / 100
if 'c8546913-bfd9-45eb-8dde-9f8754f4a32e' in str(ch.uuid):
sen_data['light'] = int.from_bytes(ch.read(), byteorder='little')
if 'c8546913-bf02-45eb-8dde-9f8754f4a32e' in str(ch.uuid):
sen_data['sound'] = int.from_bytes(ch.read(), byteorder='little') / 100
# air quality service
if 'efd658ae-c400-ef33-76e7-91b00019103b' in str(service.uuid):
for ch in service.getCharacteristics():
if 'c401' in str(ch.uuid):
sen_data['co2'] = int.from_bytes(ch.read(), byteorder='little')
if 'c402'in str(ch.uuid):
sen_data['voc'] = int.from_bytes(ch.read(), byteorder='little')
return sen_data
def main():
print('try btle connect')
btDevice = {}
for device in devices:
p = Peripheral(device[1])
if p is not None:
print ('{} connected'.format(device[0]))
btDevice[device[0]] = p
while 1:
for key, val in btDevice.items():
data = read_sensor_data(val)
debug_print(key, data)
push_to_influx(data, key)
time.sleep(2)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment