Created
August 14, 2017 14:29
-
-
Save thinkJD/b6297798a81d51341c542d70767c6e00 to your computer and use it in GitHub Desktop.
Readout Thunderboard Sense and write the values to influxDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 | |
from bluepy.btle import Peripheral | |
import time | |
from influxdb import InfluxDBClient | |
from influxdb import SeriesHelper | |
# InfluxDB connections settings | |
host = '' | |
port = '' | |
user = '' | |
password = '' | |
dbname = '' | |
myclient = InfluxDBClient(host, port, user, password, dbname) | |
# bluetooth connection | |
devices = [('DEVICE_NAME','MAC')] | |
class SeriesHelper(SeriesHelper): | |
# Meta class stores time series helper configuration. | |
class Meta: | |
client = myclient | |
series_name = 'environment' | |
fields = ['temp', 'pressure', 'uv', 'light', 'sound', 'humidity', 'co2', 'voc'] | |
tags = ['location'] | |
bulk_size = 5 | |
# autocommit must be set to True when using bulk_size | |
autocommit = True | |
def debug_print(location, data): | |
print ('### {} ###'.format(location)) | |
for key, value in data.items(): | |
print ('{}:\t{}'.format(key,value)) | |
print('') | |
def push_to_influx(data, location): | |
SeriesHelper(location=location, temp=data['temperature'], | |
pressure=data['pressure'], uv=data['uv'], | |
light=data['light'], sound=data['sound'], | |
humidity=data['humidity'], co2=data['co2'], voc=data['voc']) | |
SeriesHelper.commit() | |
def read_sensor_data(p): | |
sen_data = {} | |
for service in p.getServices(): | |
# environmental service | |
if '181a' in str(service.uuid): | |
for ch in service.getCharacteristics(): | |
if '2a76' in str(ch.uuid): | |
sen_data['uv'] = int.from_bytes(ch.read(), byteorder='little') | |
if '2a6d' in str(ch.uuid): | |
sen_data['pressure'] = int.from_bytes(ch.read(), byteorder='little') / 1000 | |
if '2a6e' in str(ch.uuid): | |
sen_data['temperature'] = int.from_bytes(ch.read(), byteorder='little') / 100 | |
if '2a6f' in str(ch.uuid): | |
sen_data['humidity'] = int.from_bytes(ch.read(), byteorder='little') / 100 | |
if 'c8546913-bfd9-45eb-8dde-9f8754f4a32e' in str(ch.uuid): | |
sen_data['light'] = int.from_bytes(ch.read(), byteorder='little') | |
if 'c8546913-bf02-45eb-8dde-9f8754f4a32e' in str(ch.uuid): | |
sen_data['sound'] = int.from_bytes(ch.read(), byteorder='little') / 100 | |
# air quality service | |
if 'efd658ae-c400-ef33-76e7-91b00019103b' in str(service.uuid): | |
for ch in service.getCharacteristics(): | |
if 'c401' in str(ch.uuid): | |
sen_data['co2'] = int.from_bytes(ch.read(), byteorder='little') | |
if 'c402'in str(ch.uuid): | |
sen_data['voc'] = int.from_bytes(ch.read(), byteorder='little') | |
return sen_data | |
def main(): | |
print('try btle connect') | |
btDevice = {} | |
for device in devices: | |
p = Peripheral(device[1]) | |
if p is not None: | |
print ('{} connected'.format(device[0])) | |
btDevice[device[0]] = p | |
while 1: | |
for key, val in btDevice.items(): | |
data = read_sensor_data(val) | |
debug_print(key, data) | |
push_to_influx(data, key) | |
time.sleep(2) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment