Created
January 23, 2020 22:12
-
-
Save smittytone/b99612dcacce14e5785ced4282f2b16a to your computer and use it in GitHub Desktop.
Electric Imp development device status monitor: runs on Raspberry Pi with Pimoroni inkyPhat
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# IMPORTS | |
import requests | |
from time import time | |
from threading import Timer | |
import inkyphat | |
from PIL import ImageFont | |
# GLOBALS | |
token = {} | |
isLoggedIn = False | |
updateTimer = None | |
presentTimer = None | |
startIndex = 0 | |
debug = True | |
uDevices = [] | |
font = None | |
# Constants | |
API_BASE_URL = "https://api.electricimp.com/v5/" | |
EXPIRY_DELTA = 5 | |
LOOP_TIME = 60 | |
PAGE_SIZE = 100 | |
LOGIN_KEY = "YOUR_ELECTRIC_IMP_ACCOUNT_LOGIN_KEY" | |
# See https://developer.electricimp.com/tools/impcentral/impcentral-user-guide#login-keys | |
# Return true if the current page is the first in | |
# a sequence of pages, otherwise return false | |
def isFirstPage(links): | |
isFirst = False | |
for link in links: | |
if link == "first": | |
currentPageLink = links.self | |
firstPageLink = links.first | |
if currentPageLink == firstPageLink: | |
isFirst = True | |
break | |
return isFirst | |
# Return the supplied URL of the next page in a sequence of pages, | |
# otherwise return an empty string | |
def getNextURL(links): | |
for link in links: | |
if link == "next": | |
return links.next | |
return "" | |
# Return the pure URL of the next page in a sequence of pages, | |
# otherwise return an empty string | |
def getNextPageLink(url): | |
if len(url) == 0: | |
return "" | |
return url[:31] | |
# Login to the impCentral API, ie. get an Access Token using the | |
# supplied account-specific Login Key | |
def login(key): | |
global token | |
if debug is True: | |
print("Logging in with Login Key " + key) | |
if len(key) == 0: | |
print("[ERROR] Could not log into the Electric Imp impCloud") | |
return | |
# Save the supplied Login Key and get a new Access Token | |
token = { "loginkey" : key } | |
getNewAccessToken() | |
# Get a new Access Token by calling /v5/auth/token and passing in a Login Key | |
def getNewAccessToken(): | |
global isLoggedIn | |
global token | |
if debug is True: | |
print("Getting Access Token") | |
# Set up the request | |
body = { "key" : token["loginkey"] } | |
headers = { "Content-Type" : "application/json" } | |
url = API_BASE_URL + "auth/token" | |
# Send the request - this is handled synchronously | |
response = requests.post(url, json=body, headers=headers) | |
# Did we get an OK back? No! | |
if response.status_code != 200: | |
isLoggedIn = False | |
print("[ERROR] " + response.text + " (" + str(response.status_code) + ")") | |
return | |
# The response was good, so process it to extract the Access Token | |
# and its expiry time | |
data = response.json() | |
if "access_token" in data: | |
token["accessToken"] = str(data["access_token"]) | |
isLoggedIn = True | |
if debug is True: | |
print("Access Token received: " + token["accessToken"][:10] + "...") | |
# Get the Token's expiry | |
if "expires_in" in data: | |
token["expiryTime"] = time() + data["expires_in"] | |
if debug is True: | |
print("Token Expiry: " + str(token["expiryTime"])) | |
else: | |
print("No expiry for access token") | |
else: | |
isLoggedIn = False | |
print("[ERROR] Did not receive an Access Token") | |
# Return true if the current Access Token is still valid, ie. it has | |
# not yet expired, otherwise return false | |
def isAccessTokenValid(): | |
global token | |
isValid = True | |
if len(token) == 0 or "expiryTime" not in token: | |
isValid = False | |
else: | |
now = time() | |
if now >= token["expiryTime"] - EXPIRY_DELTA: | |
isValid = False | |
return isValid | |
# Make a new request to the API for the account's list of devices | |
def getDeviceData(): | |
global uDevices | |
if debug is True: | |
print("Requesting device list") | |
# Set up the request | |
headers = { "User-Agent" : "PythonMonitor/smittytone", | |
"Authorization" : "Bearer " + token["accessToken"] } | |
url = API_BASE_URL + "devices?page[size]=" + str(PAGE_SIZE) | |
nextURL = "" | |
uDevices = [] | |
while True: | |
if isAccessTokenValid() == False: | |
print("AT invalid") | |
return | |
response = requests.get(url, headers = headers) | |
data = response.json() | |
if "links" in data: | |
nextURL = getNextPageLink(getNextURL(data["links"])) | |
if "data" in data: | |
dv = data["data"] | |
for device in dv: | |
uDevices.append(device) | |
url = nextURL | |
if len(nextURL) == 0: | |
break | |
# Action the acquisition of a new device list from the API | |
def updateDevices(): | |
global isLoggedIn | |
global updateTimer | |
if isLoggedIn == True: | |
if isAccessTokenValid() == False: | |
getNewAccessToken() | |
else: | |
login(LOGIN_KEY) | |
getDeviceData() | |
# Present all the devices we know about in sequence, over and over | |
def presentDevices(): | |
global uDevices | |
global startIndex | |
global presentTimer | |
global font | |
deviceCount = len(uDevices) | |
line = 0 | |
if deviceCount > 0: | |
cls() | |
for index in range(deviceCount): | |
device = uDevices[index] | |
if index >= startIndex: | |
# Get the device's name (or its ID if it has no name | |
name = device["attributes"]["name"] | |
if name == None: | |
name = device["id"] | |
# Get the device's online status. This is a boolean value, so | |
# we convert it to a suitable string | |
status = device["attributes"]["device_online"] | |
if status is True: | |
status = "online" | |
else: | |
status = "offline" | |
# Pad out the device number to two digits | |
stringy = str(index + 1) | |
if len(stringy) == 1: | |
stringy = "0" + stringy | |
# Set the print string... | |
stringy += (". " + name + " is " + status) | |
# ...and add it to the display | |
inkyphat.text((0,line), stringy, inkyphat.BLACK, font) | |
# Move the cursor down | |
line += 20 | |
# If there are more than 5 devices, we need to cycle through them | |
if deviceCount > 5: | |
if index == startIndex + 4: | |
# Set the first device in the sequence for the next pass | |
startIndex = index + 1 | |
if startIndex >= deviceCount: | |
# We have shown all the devices we know about, so | |
# go back to the start | |
startIndex = 0 | |
# And update the device list | |
updateDevices() | |
inkyphat.show() | |
break | |
# Set a timer to call this function again in 10s | |
presentTimer = Timer(10, presentDevices) | |
presentTimer.start() | |
def cls(): | |
for x in range(inkyphat.WIDTH): | |
for y in range(inkyphat.HEIGHT): | |
inkyphat.putpixel((x,y), inkyphat.WHITE) | |
# RUNTIME START | |
# Configure inkyPhat display | |
inkyphat.set_colour("black") | |
font = ImageFont.truetype("/usr/lib/python3/dist-packages/inkyphat/fonts/Roboto-Light.ttf", 16) | |
# Get an initial list of devices | |
updateDevices() | |
# Start the display refresh loop | |
presentDevices() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment