Last active
December 6, 2018 14:51
-
-
Save ergo70/9ced3c07ec0181c91f6391a5562f56bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import struct | |
import os | |
import uuid | |
import json | |
from boto3 import resource | |
from boto3.dynamodb.conditions import Key | |
from bluepy.btle import UUID, Peripheral, Scanner | |
from decimal import Decimal | |
from datetime import datetime | |
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider | |
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore | |
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient | |
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException | |
ENABLE = b'\x01' | |
DISABLE = b'\x00' | |
TEST = b'\x02' | |
lux_data_uuid = UUID("f000aa71-0451-4000-b000-000000000000") | |
lux_enable_uuid = UUID("f000aa72-0451-4000-b000-000000000000") | |
temp_relH_data_uuid = UUID("f000aa21-0451-4000-b000-000000000000") | |
temp_relH_enable_uuid = UUID("f000aa22-0451-4000-b000-000000000000") | |
POST_data_uuid = UUID("f000aa65-0451-4000-b000-000000000000") | |
POST_mode_uuid = UUID("f000aa66-0451-4000-b000-000000000000") | |
battery_level_uuid = UUID("2a19") | |
ACCESS_KEY_ID = os.environ['ACCESS_KEY_ID'] | |
SECRET_ACCESS_KEY = os.environ['SECRET_ACCESS_KEY'] | |
REGION = os.environ['AWS_REGION'] | |
TABLE = os.environ['TABLE'] | |
dynamodb_resource = resource('dynamodb', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY) | |
table = dynamodb_resource.Table(TABLE) | |
host = os.environ['ENDPOINT'] | |
rootCAPath = os.environ['ROOT_CA_FILE'] | |
certificatePath = os.environ['CERT_FILE'] | |
privateKeyPath = os.environ['KEY_FILE'] | |
clientId = os.environ['THING_NAME'] | |
topic = 'foo/bar' | |
thingName = clientId | |
sleep_interval = int(os.environ['SLEEP']) | |
ble_interface = int(os.environ['BLE_INTERFACE']) | |
MAX_DISCOVERY_RETRIES = 10 | |
GROUP_CA_PATH = "./groupCA/" | |
backOffCore = ProgressiveBackOffCore() | |
# Discover GGCs | |
discoveryInfoProvider = DiscoveryInfoProvider() | |
discoveryInfoProvider.configureEndpoint(host) | |
discoveryInfoProvider.configureCredentials(rootCAPath, certificatePath, privateKeyPath) | |
discoveryInfoProvider.configureTimeout(10) # 10 sec | |
# General message notification callback | |
def customOnMessage(message): | |
print('Received message on topic %s: %s\n' % (message.topic, message.payload)) | |
def connectGG(): | |
retryCount = MAX_DISCOVERY_RETRIES | |
discovered = False | |
groupCA = None | |
coreInfo = None | |
while retryCount != 0: | |
try: | |
discoveryInfo = discoveryInfoProvider.discover(thingName) | |
caList = discoveryInfo.getAllCas() | |
coreList = discoveryInfo.getAllCores() | |
# We only pick the first ca and core info | |
groupId, ca = caList[0] | |
coreInfo = coreList[0] | |
print("Discovered GGC: %s from Group: %s" % (coreInfo.coreThingArn, groupId)) | |
print("Now we persist the connectivity/identity information...") | |
groupCA = GROUP_CA_PATH + groupId + "_CA_" + str(uuid.uuid4()) + ".crt" | |
if not os.path.exists(GROUP_CA_PATH): | |
os.makedirs(GROUP_CA_PATH) | |
groupCAFile = open(groupCA, "w") | |
groupCAFile.write(ca) | |
groupCAFile.close() | |
discovered = True | |
print("Now proceed to the connecting flow...") | |
break | |
except DiscoveryInvalidRequestException as e: | |
print("Invalid discovery request detected!") | |
print("Type: %s" % str(type(e))) | |
print("Error message: %s" % e.message) | |
print("Stopping...") | |
break | |
except BaseException as e: | |
print("Error in discovery!") | |
print("Type: %s" % str(type(e))) | |
print("Error message: %s" % e.message) | |
retryCount -= 1 | |
print("\n%d/%d retries left\n" % (retryCount, MAX_DISCOVERY_RETRIES)) | |
print("Backing off...\n") | |
backOffCore.backOff() | |
if not discovered: | |
print("Discovery failed after %d retries. Exiting...\n" % (MAX_DISCOVERY_RETRIES)) | |
sys.exit(-1) | |
# Iterate through all connection options for the core and use the first successful one | |
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) | |
myAWSIoTMQTTClient.configureCredentials(groupCA, privateKeyPath, certificatePath) | |
myAWSIoTMQTTClient.onMessage = customOnMessage | |
connected = False | |
for connectivityInfo in coreInfo.connectivityInfoList: | |
currentHost = connectivityInfo.host | |
currentPort = connectivityInfo.port | |
print("Trying to connect to core at %s:%d" % (currentHost, currentPort)) | |
myAWSIoTMQTTClient.configureEndpoint(currentHost, currentPort) | |
try: | |
myAWSIoTMQTTClient.connect() | |
connected = True | |
break | |
except BaseException as e: | |
print("Error in connect!") | |
print("Type: %s" % str(type(e))) | |
print("Error message: %s" % e.message) | |
if not connected: | |
print("Cannot connect to core %s. Exiting..." % coreInfo.coreThingArn) | |
sys.exit(-2) | |
return myAWSIoTMQTTClient | |
def detect(interface): | |
sensors = [] | |
s = Scanner(interface) | |
s.scan() | |
devices = s.getDevices() | |
for dev in devices: | |
for (adtype, desc, value) in dev.getScanData(): | |
if value == 'CC2650 SensorTag': | |
print (" %s = %s" % (desc, value)) | |
print ("Device %s (%s), RSSI=%d dB" % (dev.addr, dev.addrType, dev.rssi)) | |
sensors.append((dev.addr, dev.addrType)) | |
#print(sensors) | |
return sensors | |
def connect(sensors): | |
connections = [Peripheral(sensor[0], sensor[1]) for sensor in sensors] | |
POST(connections) | |
for connection in connections: | |
ch = connection.getCharacteristics(uuid=lux_enable_uuid)[0] | |
ch.write(ENABLE, True) | |
ch = connection.getCharacteristics(uuid=temp_relH_enable_uuid)[0] | |
ch.write(ENABLE, True) | |
return connections | |
def POST(connections): | |
for connection in connections: | |
ch = connection.getCharacteristics(uuid=POST_mode_uuid)[0] | |
ch.write(TEST, True) | |
ch = connection.getCharacteristics(uuid=POST_data_uuid)[0] | |
if (ch.supportsRead()): | |
onebyte = ch.read() | |
if onebyte in [b'\x7e', b'\x7f']: | |
print("Sensor "+ connection.addr + " OK") | |
else: | |
print("Sensor "+ connection.addr +" fail: " + str(onebyte)) | |
ch = connection.getCharacteristics(uuid=POST_mode_uuid)[0] | |
ch.write(DISABLE, True) | |
def disconnect(connections): | |
for connection in connections: | |
addr = connection.addr | |
connection.disconnect() | |
print("Sensor "+ addr + " disconnected") | |
def read_battery(connection): | |
p = connection | |
try: | |
ch = p.getCharacteristics(uuid=battery_level_uuid)[0] | |
#print(ch) | |
if (ch.supportsRead()): | |
#while 1: | |
onebyte = ch.read() | |
battery_level = struct.unpack('B',onebyte)[0] | |
print(str(battery_level) + ' %') | |
#time.sleep(1) | |
finally: | |
#p.disconnect() | |
pass | |
return battery_level | |
def read_lux(connection): | |
p = connection | |
try: | |
ch = p.getCharacteristics(uuid=lux_data_uuid)[0] | |
#print(ch) | |
if (ch.supportsRead()): | |
#while 1: | |
twobytes = ch.read() | |
rawdata = struct.unpack('H',twobytes)[0] | |
m = rawdata & 0x0FFF | |
e = (rawdata & 0xF000) >> 12 | |
if e == 0: | |
e = 1 | |
else: | |
e = 2 << (e - 1) | |
lux = round(m * (0.01 * e),1) | |
print(str(lux) + ' Lux') | |
#time.sleep(1) | |
finally: | |
#p.disconnect() | |
pass | |
return lux | |
def read_temp_relH(connection): | |
p = connection | |
try: | |
ch = p.getCharacteristics(uuid=temp_relH_data_uuid)[0] | |
#print(ch) | |
if (ch.supportsRead()): | |
#while 1: | |
fourbytes = ch.read() | |
#print(fourbytes) | |
rawdata = struct.unpack('HH',fourbytes) | |
#print(rawdata) | |
rawdata_temp = rawdata[0] | |
rawdata_relH = rawdata[1] | |
rawdata_relH &= 0b1111111111111100 | |
temp = round((rawdata_temp / 65536.0) * 165.0 - 42.4,1) | |
print(str(temp) + ' °C') | |
relH = round((rawdata_relH / 65536.0) * 100.0,1) | |
print(str(relH) + ' % rH') | |
finally: | |
#p.disconnect() | |
pass | |
return temp, relH | |
def save_to_dynamoDB(ts, sensor_addr, lux, celsius, rH, battery): | |
col_dict = {"measurement": ts + '|' + sensor_addr, "sensor": sensor_addr, "ts": ts, "lux": Decimal(str(lux)), "celsius": Decimal(str(celsius)), "rH": Decimal(str(rH)), "battery": Decimal(str(battery))} | |
response = table.put_item(Item=col_dict) | |
print(response) | |
def save_to_GG(MQTTClient, ts, sensor_addr, lux, celsius, rH, battery): | |
col_dict = {"measurement" : {"sensor": sensor_addr, "ts": ts, "data": {"lux": lux, "celsius": celsius, "rH": rH, "battery": battery}}} | |
if MQTTClient: | |
messageJson = json.dumps(col_dict, sort_keys=True) | |
MQTTClient.publish(topic, messageJson, 0) | |
print('Published topic %s: %s\n' % (topic, messageJson)) | |
try: | |
connections = connect(detect(ble_interface)) | |
MQTTClient = connectGG() | |
while 1: | |
for connection in connections: | |
print("Reading sensor: "+str(connection.addr)) | |
lux = read_lux(connection) | |
celsius, rH = read_temp_relH(connection) | |
battery = read_battery(connection) | |
ts = str(datetime.utcnow()) | |
save_to_dynamoDB(ts, str(connection.addr), lux, celsius, rH, battery) | |
save_to_GG(MQTTClient, ts, str(connection.addr), lux, celsius, rH, battery) | |
time.sleep(sleep_interval) | |
finally: | |
disconnect(connections) | |
MQTTClient.disconnect() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment