Skip to content

Instantly share code, notes, and snippets.

@smittytone
Created January 23, 2020 22:12
Show Gist options
  • Save smittytone/b99612dcacce14e5785ced4282f2b16a to your computer and use it in GitHub Desktop.
Save smittytone/b99612dcacce14e5785ced4282f2b16a to your computer and use it in GitHub Desktop.
Electric Imp development device status monitor: runs on Raspberry Pi with Pimoroni inkyPhat
#!/usr/bin/env python
# IMPORTS
import requests
from time import time
from threading import Timer
import inkyphat
from PIL import ImageFont
# GLOBALS
token = {}
isLoggedIn = False
updateTimer = None
presentTimer = None
startIndex = 0
debug = True
uDevices = []
font = None
# Constants
API_BASE_URL = "https://api.electricimp.com/v5/"
EXPIRY_DELTA = 5
LOOP_TIME = 60
PAGE_SIZE = 100
LOGIN_KEY = "YOUR_ELECTRIC_IMP_ACCOUNT_LOGIN_KEY"
# See https://developer.electricimp.com/tools/impcentral/impcentral-user-guide#login-keys
# Return true if the current page is the first in
# a sequence of pages, otherwise return false
def isFirstPage(links):
isFirst = False
for link in links:
if link == "first":
currentPageLink = links.self
firstPageLink = links.first
if currentPageLink == firstPageLink:
isFirst = True
break
return isFirst
# Return the supplied URL of the next page in a sequence of pages,
# otherwise return an empty string
def getNextURL(links):
for link in links:
if link == "next":
return links.next
return ""
# Return the pure URL of the next page in a sequence of pages,
# otherwise return an empty string
def getNextPageLink(url):
if len(url) == 0:
return ""
return url[:31]
# Login to the impCentral API, ie. get an Access Token using the
# supplied account-specific Login Key
def login(key):
global token
if debug is True:
print("Logging in with Login Key " + key)
if len(key) == 0:
print("[ERROR] Could not log into the Electric Imp impCloud")
return
# Save the supplied Login Key and get a new Access Token
token = { "loginkey" : key }
getNewAccessToken()
# Get a new Access Token by calling /v5/auth/token and passing in a Login Key
def getNewAccessToken():
global isLoggedIn
global token
if debug is True:
print("Getting Access Token")
# Set up the request
body = { "key" : token["loginkey"] }
headers = { "Content-Type" : "application/json" }
url = API_BASE_URL + "auth/token"
# Send the request - this is handled synchronously
response = requests.post(url, json=body, headers=headers)
# Did we get an OK back? No!
if response.status_code != 200:
isLoggedIn = False
print("[ERROR] " + response.text + " (" + str(response.status_code) + ")")
return
# The response was good, so process it to extract the Access Token
# and its expiry time
data = response.json()
if "access_token" in data:
token["accessToken"] = str(data["access_token"])
isLoggedIn = True
if debug is True:
print("Access Token received: " + token["accessToken"][:10] + "...")
# Get the Token's expiry
if "expires_in" in data:
token["expiryTime"] = time() + data["expires_in"]
if debug is True:
print("Token Expiry: " + str(token["expiryTime"]))
else:
print("No expiry for access token")
else:
isLoggedIn = False
print("[ERROR] Did not receive an Access Token")
# Return true if the current Access Token is still valid, ie. it has
# not yet expired, otherwise return false
def isAccessTokenValid():
global token
isValid = True
if len(token) == 0 or "expiryTime" not in token:
isValid = False
else:
now = time()
if now >= token["expiryTime"] - EXPIRY_DELTA:
isValid = False
return isValid
# Make a new request to the API for the account's list of devices
def getDeviceData():
global uDevices
if debug is True:
print("Requesting device list")
# Set up the request
headers = { "User-Agent" : "PythonMonitor/smittytone",
"Authorization" : "Bearer " + token["accessToken"] }
url = API_BASE_URL + "devices?page[size]=" + str(PAGE_SIZE)
nextURL = ""
uDevices = []
while True:
if isAccessTokenValid() == False:
print("AT invalid")
return
response = requests.get(url, headers = headers)
data = response.json()
if "links" in data:
nextURL = getNextPageLink(getNextURL(data["links"]))
if "data" in data:
dv = data["data"]
for device in dv:
uDevices.append(device)
url = nextURL
if len(nextURL) == 0:
break
# Action the acquisition of a new device list from the API
def updateDevices():
global isLoggedIn
global updateTimer
if isLoggedIn == True:
if isAccessTokenValid() == False:
getNewAccessToken()
else:
login(LOGIN_KEY)
getDeviceData()
# Present all the devices we know about in sequence, over and over
def presentDevices():
global uDevices
global startIndex
global presentTimer
global font
deviceCount = len(uDevices)
line = 0
if deviceCount > 0:
cls()
for index in range(deviceCount):
device = uDevices[index]
if index >= startIndex:
# Get the device's name (or its ID if it has no name
name = device["attributes"]["name"]
if name == None:
name = device["id"]
# Get the device's online status. This is a boolean value, so
# we convert it to a suitable string
status = device["attributes"]["device_online"]
if status is True:
status = "online"
else:
status = "offline"
# Pad out the device number to two digits
stringy = str(index + 1)
if len(stringy) == 1:
stringy = "0" + stringy
# Set the print string...
stringy += (". " + name + " is " + status)
# ...and add it to the display
inkyphat.text((0,line), stringy, inkyphat.BLACK, font)
# Move the cursor down
line += 20
# If there are more than 5 devices, we need to cycle through them
if deviceCount > 5:
if index == startIndex + 4:
# Set the first device in the sequence for the next pass
startIndex = index + 1
if startIndex >= deviceCount:
# We have shown all the devices we know about, so
# go back to the start
startIndex = 0
# And update the device list
updateDevices()
inkyphat.show()
break
# Set a timer to call this function again in 10s
presentTimer = Timer(10, presentDevices)
presentTimer.start()
def cls():
for x in range(inkyphat.WIDTH):
for y in range(inkyphat.HEIGHT):
inkyphat.putpixel((x,y), inkyphat.WHITE)
# RUNTIME START
# Configure inkyPhat display
inkyphat.set_colour("black")
font = ImageFont.truetype("/usr/lib/python3/dist-packages/inkyphat/fonts/Roboto-Light.ttf", 16)
# Get an initial list of devices
updateDevices()
# Start the display refresh loop
presentDevices()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment