Last active
October 1, 2020 03:33
-
-
Save collina/0a54bbc34b15b479bf251435fee9daea to your computer and use it in GitHub Desktop.
CoAware for m5stack
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CoAware | |
# Collin Anderson (cda.io) | |
# MIT License | |
import os | |
import ubluetooth | |
import ubinascii | |
import m5ui | |
import time | |
import ntptime | |
import wifiCfg | |
import _thread | |
import ujson | |
from m5stack import lcd, power, btnA, btnB, btnC, const | |
from uiflow import wait_ms | |
from libs.m5mqtt import MQTTClient | |
# Core | |
TEXT_APP_NAME = 'CoAware' | |
advertisements = {} | |
advertisements_queue = [] | |
def ntp_setup(): | |
if not wifiCfg.is_connected(): | |
wifiCfg.autoConnect() | |
if wifiCfg.is_connected(): | |
ntptime.client() | |
ntp_setup() | |
start_time = time.time() | |
# Timing Variables | |
TIME_ADV_STALE = const(60 * 2) | |
TIME_AUTOSLEEP = const(60 * 6) | |
TIME_SYNC = const(1000 * 60 * 1) | |
# Interface variable | |
COLOR_TITLE = const(0x8C403A) | |
COLOR_APP_BACKGROUND = const(0x400101) | |
COLOR_TEXT_TITLE = const(0xFFFFFF) | |
COLOR_TEXT_BASE = const(0xFFFFFF) | |
COLOR_TEXT_FAINT = const(0xD9A796) | |
COLOR_TEXT_PROMINENT = const(0xC2EBBE) | |
COLOR_TEXT_ERROR= const(0xC9E0B8) | |
COLOR_SIGNAL_BAR_LIT = const(0x33cc00) | |
COLOR_SIGNAL_BAR_DARK = COLOR_APP_BACKGROUND | |
COLOR_CHARGING = const(0xC2EBBE) | |
GEOMETRY_LIVE_COUNT_LESS_TEN = const(25) | |
GEOMETRY_LIVE_COUNT_MORE_TEN = const(5) | |
MAX_ROWS = const(6) | |
rows_advertisements_address = [] | |
rows_advertisements_time = [] | |
rows_advertisements_liveness = [] | |
lcd.clear() | |
m5ui.setScreenColor(COLOR_APP_BACKGROUND) | |
box_live_count = m5ui.M5TextBox(GEOMETRY_LIVE_COUNT_LESS_TEN, 45, "0", lcd.FONT_DejaVu72, COLOR_TEXT_PROMINENT, rotate=0) | |
box_seen_count = m5ui.M5TextBox(70, 150, "0", lcd.FONT_DejaVu18, COLOR_TEXT_BASE, rotate=0) | |
box_uptime = m5ui.M5TextBox(70, 180, "-", lcd.FONT_DejaVu18, COLOR_TEXT_BASE, rotate=0) | |
box_memory = m5ui.M5TextBox(200, 4, "", lcd.FONT_Small, COLOR_TEXT_BASE, rotate=0) | |
time_window = m5ui.M5TextBox(200, 2, "", lcd.FONT_Ubuntu, COLOR_TEXT_ERROR, rotate=0) | |
power_window = m5ui.M5Rect(280, 4, 2, 12, COLOR_TEXT_PROMINENT, COLOR_TEXT_PROMINENT) | |
debug_line_1 = m5ui.M5TextBox(10, 200, "", lcd.FONT_Small, COLOR_TEXT_ERROR, rotate=0) | |
debug_line_2 = m5ui.M5TextBox(10, 220, "", lcd.FONT_Small, COLOR_TEXT_ERROR, rotate=0) | |
def interface_setup(): | |
lcd.setBrightness(40) | |
m5ui.M5Title(title=TEXT_APP_NAME, x=10 , fgcolor=COLOR_TEXT_TITLE, bgcolor=COLOR_TITLE) | |
m5ui.M5TextBox(25, 115, "Live", lcd.FONT_DejaVu18, COLOR_TEXT_FAINT, rotate=0) | |
m5ui.M5TextBox(10, 150, "Seen", lcd.FONT_DejaVu18, COLOR_TEXT_FAINT, rotate=0) | |
m5ui.M5TextBox(10, 180, "Up", lcd.FONT_DejaVu18, COLOR_TEXT_FAINT, rotate=0) | |
m5ui.M5Rect(280, 4, 32, 12, COLOR_TITLE, COLOR_TEXT_PROMINENT) | |
# m5ui.M5TextBox(50, 215, "Opt", lcd.FONT_Small, COLOR_TEXT_FAINT, rotate=0) | |
# m5ui.M5TextBox(140, 215, "Opt", lcd.FONT_Small, COLOR_TEXT_FAINT, rotate=0) | |
m5ui.M5TextBox(230, 215, "Screen", lcd.FONT_Small, COLOR_TEXT_FAINT, rotate=0) | |
for i in range(1, MAX_ROWS + 1): | |
rows_advertisements_liveness.append([ | |
m5ui.M5Rect(130, i * 28 + 10 + 6, 2, 3, COLOR_SIGNAL_BAR_DARK), | |
m5ui.M5Rect(130 + 4 * 1, i * 28 + 10 + 4, 2, 5, COLOR_SIGNAL_BAR_DARK), | |
m5ui.M5Rect(130 + 4 * 2, i * 28 + 10 + 2, 2, 7, COLOR_SIGNAL_BAR_DARK), | |
m5ui.M5Rect(130 + 4 * 3, i * 28 + 10, 2, 9, COLOR_SIGNAL_BAR_DARK) | |
]) | |
rows_advertisements_address.append(m5ui.M5TextBox(155, i * 28 + 6, '', lcd.FONT_Ubuntu, COLOR_TEXT_BASE, rotate=0)) | |
rows_advertisements_time.append(m5ui.M5TextBox(283, i * 28 + 6, '', lcd.FONT_Ubuntu, COLOR_TEXT_BASE, rotate=0)) | |
def interface_update(): | |
i = 0 | |
alive_beacons = 0 | |
elapsed_time = format_appropriate_time(time.time() - start_time) | |
box_uptime.setText(elapsed_time) | |
box_seen_count.setText(str(len(advertisements))) | |
for advertisement in sorted(advertisements.values(), key=lambda x: x['last_seen'], reverse=True): | |
is_alive = (time.time() - advertisement['last_seen']) < TIME_ADV_STALE | |
if i < MAX_ROWS: | |
rows_advertisements_address[i].setText(advertisement['address_hex']) | |
rows_advertisements_time[i].setText(format_appropriate_time(time.time() - advertisement['last_seen'])) | |
if is_alive: | |
interface_signal_bars(i, advertisement['rssi']) | |
rows_advertisements_address[i].setColor(COLOR_TEXT_PROMINENT) | |
rows_advertisements_time[i].setColor(COLOR_TEXT_PROMINENT) | |
else: | |
interface_signal_bars(i, None) | |
rows_advertisements_address[i].setColor(COLOR_TEXT_FAINT) | |
rows_advertisements_time[i].setColor(COLOR_TEXT_FAINT) | |
if is_alive: | |
alive_beacons += 1 | |
i += 1 | |
box_live_count.setText(str(alive_beacons)) | |
interface_battery_update() | |
def interface_battery_update(): | |
battery_level = power.getBatteryLevel() | |
if battery_level == 100: | |
power_window.setSize(width=32) | |
elif battery_level >= 75: | |
power_window.setSize(width=24) | |
elif battery_level >= 50: | |
power_window.setSize(width=16) | |
elif battery_level >= 25: | |
power_window.setSize(width=8) | |
elif battery_level >= 0: | |
power_window.setSize(width=0) | |
if power.isCharging(): | |
power_window.setBgColor(COLOR_CHARGING) | |
else: | |
power_window.setBgColor(COLOR_TEXT_BASE) | |
def interface_signal_bars(i, rssi): | |
rows_advertisements_liveness[i][0].setBorderColor(COLOR_SIGNAL_BAR_DARK) | |
rows_advertisements_liveness[i][1].setBorderColor(COLOR_SIGNAL_BAR_DARK) | |
rows_advertisements_liveness[i][2].setBorderColor(COLOR_SIGNAL_BAR_DARK) | |
rows_advertisements_liveness[i][3].setBorderColor(COLOR_SIGNAL_BAR_DARK) | |
if rssi is None: | |
return | |
if rssi > -85: | |
rows_advertisements_liveness[i][0].setBorderColor(COLOR_SIGNAL_BAR_LIT) | |
if rssi > -70: | |
rows_advertisements_liveness[i][1].setBorderColor(COLOR_SIGNAL_BAR_LIT) | |
if rssi > -55: | |
rows_advertisements_liveness[i][2].setBorderColor(COLOR_SIGNAL_BAR_LIT) | |
if rssi > -40: | |
rows_advertisements_liveness[i][3].setBorderColor(COLOR_SIGNAL_BAR_LIT) | |
# Utilities | |
MQTT_NAME = 'AdafruitIO' | |
MQTT_HOST = 'io.adafruit.com' | |
MQTT_PORT = 1883 | |
MQTT_USERNAME = None | |
MQTT_KEY = None | |
MQTT_FEED = MQTT_USERNAME + '/feeds/beacons' | |
advertisement_logger = None | |
mqtt_client = None | |
last_interaction = time.time() | |
screen_on = True | |
def log_setup(): | |
LOCATION_DATA_NAME = "coaware-{}.dat" | |
global advertisement_logger | |
try: | |
os.listdir('/sd') | |
location_data_path = "/sd/coaware" | |
except OSError: | |
location_data_path = "/flash/coaware" | |
try: | |
os.listdir(location_data_path) | |
except OSError: | |
os.mkdir(location_data_path) | |
log_location = location_data_path + '/' + LOCATION_DATA_NAME.format(start_time) | |
advertisement_logger = open(log_location, mode='wt') | |
def mqtt_setup(): | |
global mqtt_client | |
if MQTT_NAME and MQTT_HOST and MQTT_PORT and MQTT_USERNAME and MQTT_KEY: | |
mqtt_client = MQTTClient(MQTT_NAME, MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_KEY, 300) | |
mqtt_client.connect() | |
def log_sync(): | |
global advertisements_queue | |
try: | |
while True: | |
wait_ms(TIME_SYNC) | |
sync_time = time.time() | |
for address_hex, beacon in format_summarized_advertisements(advertisements_queue).items(): | |
if mqtt_client: | |
mqtt_client.publish(MQTT_FEED, ujson.dumps(beacon)) | |
if not mqtt_client.is_conn_issue(): | |
advertisements_queue = [a for a in advertisements_queue if a[1] != address_hex] | |
advertisement_logger.write("{},{},{},{},{},{},{}\n".format(sync_time, address_hex, beacon['data_hex'], | |
beacon['first_seen'], beacon['last_seen'], beacon['rssi_min'], beacon['rssi_max'])) | |
else: | |
elapsed_time = format_appropriate_time(time.time() - start_time) | |
mqtt_setup() | |
debug_line_2.setText("log_sync: reconnect {}, took {}".format(elapsed_time, time.time() - sync_time)) | |
else: | |
advertisement_logger.write("{},{},{},{},{},{},{}\n".format(sync_time, address_hex, beacon['data_hex'], | |
beacon['first_seen'], beacon['last_seen'], beacon['rssi_min'], beacon['rssi_max'])) | |
advertisement_logger.flush() | |
except Exception as e_message: | |
debug_line_2.setText("log_sync: {}".format(e_message)) | |
def format_summarized_advertisements(advertisements_queue): | |
advertisements_summarized = {} | |
for (address_type, address_hex, rssi, data_hex, time_now) in advertisements_queue: | |
if address_hex not in advertisements_summarized: | |
advertisements_summarized[address_hex] = { | |
'type': address_type, | |
'address_hex': address_hex, | |
'data_hex': data_hex, | |
'first_seen': time_now, | |
'last_seen': time_now, | |
'rssi_min': rssi, | |
'rssi_max': rssi, | |
} | |
advertisements_summarized[address_hex]['first_seen'] = min(time_now, advertisements_summarized[address_hex]['first_seen']) | |
advertisements_summarized[address_hex]['last_seen'] = max(time_now, advertisements_summarized[address_hex]['last_seen']) | |
advertisements_summarized[address_hex]['rssi_min'] = min(rssi, advertisements_summarized[address_hex]['rssi_min']) | |
advertisements_summarized[address_hex]['rssi_max'] = max(rssi, advertisements_summarized[address_hex]['rssi_max']) | |
return advertisements_summarized | |
def format_appropriate_time(elapsed_time): | |
if elapsed_time // (60 * 60 * 24) >= 1: | |
return "{}d".format(elapsed_time // (60 * 60 * 24)) | |
elif elapsed_time // (60 * 60) >= 1: | |
return "{}h".format(elapsed_time // (60 * 60)) | |
elif elapsed_time // 60 >= 1: | |
return "{}m".format(elapsed_time // 60) | |
return "{}s".format(elapsed_time) | |
def toggle_screen(): | |
global screen_on | |
global last_interaction | |
if screen_on: | |
lcd.setBrightness(0) | |
lcd.tft_writecmd(0x28) | |
else: | |
lcd.setBrightness(40) | |
lcd.tft_writecmd(0x29) | |
last_interaction = time.time() | |
screen_on = not screen_on | |
def button_setup(): | |
btnA.wasPressed(handler_button_a) | |
btnB.wasPressed(handler_button_b) | |
btnC.wasPressed(handler_button_c) | |
def handler_button_a(): | |
if not screen_on: | |
toggle_screen() | |
else: | |
pass | |
def handler_button_b(): | |
if not screen_on: | |
toggle_screen() | |
else: | |
pass | |
def handler_button_c(): | |
toggle_screen() | |
# Bluetooth variables | |
bt = ubluetooth.BLE() | |
_IRQ_SCAN_RESULT = const(5) | |
_IRQ_SCAN_DONE = const(6) | |
def bt_handler(event, bt_data): | |
try: | |
if event == _IRQ_SCAN_RESULT: | |
time_now = time.time() | |
address_type, address, iscon, rssi, adv_data = bt_data | |
address_hex = ubinascii.hexlify(address).decode() | |
adv_data_hex = ubinascii.hexlify(adv_data).decode() | |
adv_data_decoded = bt_decode_adv_data(adv_data) | |
if len(adv_data_decoded) > 2 and adv_data_decoded[1][1] == b'o\xfd': | |
if address_hex not in advertisements: | |
advertisements[address_hex] = { | |
'type': address_type, | |
'address': address, | |
'address_hex': address_hex, | |
'rssi': rssi, | |
'data': adv_data_decoded, | |
'first_seen': time_now, | |
'last_seen': time_now, | |
} | |
else: | |
advertisements[address_hex]['last_seen'] = time_now | |
advertisements[address_hex]['rssi'] = rssi | |
advertisements_queue.append((address_type, address_hex, rssi, adv_data_hex, time_now)) | |
except Exception as e_message: | |
debug_line_2.setText("{} @ {}".format(str(time.time()), e_message)) | |
def bt_setup(): | |
bt.active(True) | |
bt.gap_scan(0, 4000, 1000) # Listen for 1 second every 4 seconds | |
bt.irq(bt_handler) | |
def bt_decode_adv_data(adv_data): | |
offset = 0 | |
advertisement = [] | |
while offset < len(adv_data): | |
field_len = int.from_bytes(adv_data[offset:offset + 1], 'little') | |
if field_len == 0 or offset + field_len > len(adv_data): | |
return advertisement | |
field_type = int.from_bytes(adv_data[offset + 1:offset + 2], 'little') | |
field_value = adv_data[offset + 2:offset + 2 + field_len - 1] | |
advertisement.append((field_type, field_value)) | |
offset += field_len + 1 | |
return advertisement | |
# Main Functions | |
def main(): | |
try: | |
log_setup() | |
button_setup() | |
interface_setup() | |
mqtt_setup() | |
bt_setup() | |
_thread.start_new_thread(log_sync, ()) | |
while True: | |
interface_update() | |
wait_ms(1000) | |
if screen_on and time.time() > (TIME_AUTOSLEEP + last_interaction): | |
toggle_screen() | |
except Exception as e_message: | |
debug_line_2.setText("{} @ {}".format(str(time.time()), e_message)) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment