Last active
November 21, 2018 12:10
-
-
Save rcarmo/28d3b50b3693a03b82c828f50c60d21c to your computer and use it in GitHub Desktop.
EnviroPHAT events to Azure IoT Hub, with Stream Analytics output to Azure Data Lake and PowerBI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Minimalist environmental data logger to Azure IoT Hub | |
# Rui Carmo, November 2018 | |
from envirophat import light, motion, weather, leds | |
from base64 import b64encode, b64decode | |
from hashlib import sha256 | |
from time import time, sleep | |
from urllib import quote_plus, urlencode | |
from hmac import HMAC | |
from requests import Session | |
from json import dumps | |
from os import environ | |
from subprocess import call | |
from traceback import format_exc | |
from logging import getLogger, basicConfig, ERROR, DEBUG, INFO | |
basicConfig(level=DEBUG, format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s", filename="/home/pi/error.log") | |
log = getLogger(__name__) | |
CONNECTION_STRING = environ["CONNECTION_STRING"] # fail immediately if not present | |
# NOTE: This assumes specific field ordering in the connection string | |
HOST_NAME, DEVICE_NAME, DEVICE_KEY = [part[part.index('=') + 1:] for part in CONNECTION_STRING.split(";")] | |
EVENT_FORMAT = environ.get("EVENT_FORMAT", "flat") | |
EVENT_INTERVAL = int(environ.get("EVENT_INTERVAL", "1")) | |
def generate_sas_token(uri, key, policy_name=None, expiry=3600): | |
ttl = time() + expiry | |
sign_key = "%s\n%d" % ((quote_plus(uri)), int(ttl)) | |
signature = b64encode(HMAC(b64decode(key), sign_key, sha256).digest()) | |
rawtoken = { | |
'sr' : uri, | |
'sig': signature, | |
'se' : str(int(ttl)) | |
} | |
if policy_name is not None: | |
rawtoken['skn'] = policy_name | |
return 'SharedAccessSignature ' + urlencode(rawtoken) | |
def collect_environment_data(): | |
lux = light.light() | |
leds.on() | |
rgb = ",".join(map(str,light.rgb())) | |
leds.off() | |
return { | |
"lux": lux, | |
"rgb": rgb, | |
"accelerometer": ",".join(map(str,motion.accelerometer())), | |
"heading": motion.heading(), # Uncalibrated | |
"temperature": weather.temperature(), # Celsius | |
"pressure": weather.pressure() # hectoPascals | |
} | |
def collect_wifi_data(): | |
with open("/proc/net/wireless", "r") as stats: | |
for line in stats.readlines(): | |
if "wlan0" in line: | |
_, _, link, level, noise, _ = filter(len,line.strip().split(" "))[:6] | |
return { | |
"link": float(link), # link quality | |
"level": float(level), # signal level | |
"noise": float(noise) # noise level | |
} | |
def collect_and_merge_data(): | |
environ = collect_environment_data() | |
wifi = collect_wifi_data() | |
if wifi: | |
environ.update(wifi) | |
values = [] | |
# Higher-level abstractions for nested JSON | |
json_fields = ["lux", "rgb", "accelerometer", "heading", "temperature", "pressure", "link", "level", "noise"] | |
# Lower-level counters for CSV and flat JSON | |
csv_fields = ["lux", "r","g","b", "x", "y", "z", "heading", "temperature", "pressure", "link", "level", "noise"] | |
for field in json_fields: | |
values.append(environ.get(field,"")) | |
csv_values = ",".join(map(str,values)).split(",") | |
return { | |
"json": dumps(environ), | |
"csv": ",".join(csv_fields) + "\n" + ",".join(csv_values), | |
"flat": dumps({x: csv_values[csv_fields.index(x)] for x in csv_fields}) | |
} | |
def main(): | |
log.debug("Run started.") | |
while(True): | |
try: | |
with Session() as s: | |
while(True): | |
data = collect_and_merge_data()[EVENT_FORMAT] | |
uri = "%s/devices/%s/messages/events?api-version=%s" % (HOST_NAME, DEVICE_NAME, "2016-11-14") | |
headers = {'Authorization': generate_sas_token(uri, DEVICE_KEY)} | |
if EVENT_FORMAT in ['json', 'flat']: | |
headers['Content-Type'] = "application/json" | |
res = s.post("https://" + uri, | |
headers = headers, | |
timeout = 5, | |
data = data) | |
log.debug("Data: %s" % data) | |
log.debug("Result: '%s', %d" % (res.text, res.status_code)) | |
sleep(EVENT_INTERVAL) | |
log.error("Session failed.") | |
except KeyboardInterrupt: | |
# Exit in an orderly fashion | |
leds.off() | |
log.error("Interrupted.") | |
break | |
except Exception as e: | |
log.error(e) | |
# In case of network failure and suchlike | |
leds.on() | |
log.error(format_exc()) | |
call(["sudo", "ifdown", "--force", "wlan0"]) | |
sleep(1) | |
call(["sudo", "ifup", "wlan0"]) | |
log.error("Interface reset.") | |
sleep(5) | |
leds.off() | |
pass | |
log.error("Exited.") | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- System.TimeStamp AS time, -- naive timestamp | |
-- EventProcessedUtcTime AS time, -- processed by Stream Analytics | |
-- EventEnqueuedUtcTime AS time, -- from Event Hubs | |
-- IoTHub.EnqueuedTime AS time -- internal IoT Hub timestamp | |
SELECT | |
EventEnqueuedUtcTime AS time, | |
lux, r, g, b, x, y, z, heading, temperature, pressure, link, level, noise | |
INTO | |
bag | |
FROM | |
river | |
SELECT | |
EventEnqueuedUtcTime AS time, | |
MAX(lux) as lux, AVG(r) as r, AVG(g) as g, AVG(b) as b, MAX(x) as x, MAX(y) as y, MAX(z) as z, | |
AVG(heading) as heading, AVG(temperature) as temperature, AVG(pressure) as pressure, MIN(link) as link, MIN(level) as level, MAX(noise) as noise, | |
COUNT(*) AS[eventCount] | |
INTO | |
pbi | |
FROM | |
river | |
GROUP BY | |
EventEnqueuedUtcTime, | |
lux, r, g, b, x, y, z, heading, temperature, pressure, link, level, noise, | |
TumblingWindow(second, 5) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@environment = | |
EXTRACT | |
time DateTime, | |
lux double, | |
r int, | |
g int, | |
b int, | |
x double, | |
y double, | |
z double, | |
heading double, | |
temperature double, | |
pressure double, | |
link double, | |
level double, | |
noise double | |
FROM @in | |
USING Extractors.Csv(skipFirstNRows:1); | |
OUTPUT @environment | |
TO @out | |
USING Outputters.Csv(outputHeader:false); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@environment = | |
EXTRACT | |
time DateTime, | |
lux double, | |
r int, | |
g int, | |
b int, | |
x double, | |
y double, | |
z double, | |
heading double, | |
temperature double, | |
pressure double, | |
link double, | |
level double, | |
noise double | |
FROM "environment/csv/2017/04/{*}.csv" | |
USING Extractors.Csv(skipFirstNRows:1); | |
@light = | |
SELECT time, lux, r, g, b, temperature | |
FROM @environment; | |
@movement = | |
SELECT time, x, y, z, heading | |
FROM @environment; | |
@physics = | |
SELECT time, heading, temperature, pressure, link, level | |
FROM @environment; | |
OUTPUT @light | |
TO "/consolidated/light.csv" | |
USING Outputters.Csv(outputHeader:true); | |
OUTPUT @movement | |
TO "/consolidated/movement.csv" | |
USING Outputters.Csv(outputHeader:true); | |
OUTPUT @physics | |
TO "/consolidated/physics.csv" | |
USING Outputters.Csv(outputHeader:true); | |
OUTPUT @environment | |
TO "/consolidated/all.csv" | |
USING Outputters.Csv(outputHeader:true); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment