Last active
August 1, 2021 02:36
-
-
Save rpavlik/23e359fb5cf69b90f0868c18c5639b6f to your computer and use it in GitHub Desktop.
air quality on funhouse/clue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries | |
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries | |
# SPDX-FileCopyrightText: 2021, Ryan Pavlik <[email protected]> | |
# SPDX-License-Identifier: MIT | |
try: | |
from typing import Optional | |
except ImportError: | |
pass | |
from pmhelper import PMDATA_ROWS | |
from netstuff import NetStuff | |
import time | |
import board | |
import microcontroller | |
from adafruit_pm25.i2c import PM25_I2C | |
from adafruit_scd30 import SCD30 | |
from adafruit_funhouse import FunHouse | |
from adafruit_simple_text_display import SimpleTextDisplay | |
from dew_point import apply_temp_compensation_to_relative_humidity, compute_dew_point | |
# i2c = busio.I2C(board.SCL, board.SDA, frequency=50000) | |
i2c = board.I2C() | |
funhouse = FunHouse(default_bg=None) | |
TEMP_ADJUST = -14 | |
pm25 = None # type: Optional[PM25_I2C] | |
scd = None # type: Optional[SCD30] | |
try: | |
scd = SCD30(i2c) | |
have_scd = True | |
scd.measurement_interval = 5 | |
except ValueError: | |
have_scd = False | |
try: | |
pm25 = PM25_I2C(i2c) | |
except ValueError: | |
pass | |
def c_to_f(temp): | |
return temp * 1.8 + 32 | |
def ljust(s, n): | |
return s + " " * (n - len(s)) | |
if have_scd: | |
# Tell it our ambient pressure on startup for best data | |
scd.ambient_pressure = funhouse.peripherals.pressure | |
max_co2 = 0 # type: int | |
last_co2 = "No CO2 meas" | |
reset_countdown = None # type: Optional[int] | |
netstuff = NetStuff(funhouse) | |
class AutoOffScreen: | |
def __init__(self, duration=10, initial_duration=60) -> None: | |
self.turn_off = None # type: Optional[int] | |
self.duration = duration | |
self.set_turn_off(time.monotonic() + initial_duration) | |
def set_turn_off(self, off_time): | |
self.on = True | |
if self.turn_off: | |
self.turn_off = max(self.turn_off, off_time) | |
else: | |
self.turn_off = off_time | |
def poll(self): | |
now = time.monotonic() | |
if funhouse.peripherals.pir_sensor: | |
# turn on/push out turn-off time | |
self.set_turn_off(now + self.duration) | |
return True | |
if self.on and self.turn_off is not None and now >= self.turn_off: | |
self.on = False | |
self.turn_off = None | |
return self.on | |
screen = AutoOffScreen() | |
data_display = SimpleTextDisplay(text_scale=2) | |
SCD30_CAL_VALUE = 400 | |
def update_during_recalibrate(): | |
while not scd.data_available: | |
time.sleep(0.5) | |
data_display[4].text = "CO2 meas: {:.1f}".format(scd.CO2) | |
data_display.show() | |
def recalibrate(): | |
global reset_countdown | |
data_display[0].text = "" | |
data_display[1].text = "" | |
data_display[2].text = "" | |
data_display[3].text = "Will recal to {}ppm".format(SCD30_CAL_VALUE) | |
data_display[4].text = "Must be outside!" | |
data_display[5].text = "Release to continue" | |
data_display[6].text = "" | |
data_display[7].text = "" | |
data_display[8].text = "" | |
data_display.show() | |
while funhouse.peripherals.button_sel: | |
time.sleep(0.5) | |
scd.ambient_pressure = funhouse.peripherals.pressure | |
update_during_recalibrate() | |
data_display[5].text = "Press select again to confirm" | |
data_display.show() | |
while not funhouse.peripherals.button_sel: | |
update_during_recalibrate() | |
data_display[5].text = "Release and move away!" | |
data_display.show() | |
while funhouse.peripherals.button_sel: | |
update_during_recalibrate() | |
calibrate_at = time.monotonic() + 15 | |
left = calibrate_at - time.monotonic() | |
while left > 0: | |
data_display[5].text = "Calibrate in {:1f}".format(left) | |
update_during_recalibrate() | |
left = calibrate_at - time.monotonic() | |
data_display[5].text = "Calibrating!" | |
data_display.show() | |
scd.forced_recalibration_reference = SCD30_CAL_VALUE | |
time.sleep(5) | |
data_display[5].text = "Calibrated, reset to resume normal ops" | |
data_display.show() | |
update_during_recalibrate() | |
reset_countdown = 2 | |
while True: | |
show_display = screen.poll() | |
data_display[0].text = "Pressure: {:.3f}hPa".format(funhouse.peripherals.pressure) | |
netstuff.pressure = funhouse.peripherals.pressure | |
# PM25 sensor stuff | |
if pm25 is not None: | |
try: | |
aqdata = pm25.read() | |
except RuntimeError: | |
print("Unable to read from sensor, retrying...") | |
continue | |
for row, row_details in enumerate(PMDATA_ROWS, 3): | |
row_text = "{}: {}".format( | |
row_details.raw_label, aqdata[row_details.raw_key] | |
) | |
if row_details.pm_label: | |
pm_text = "{}: {}".format( | |
row_details.pm_label, aqdata[row_details.pm_key] | |
) | |
row_text = ljust(row_text, 10) + pm_text | |
data_display[row].text = row_text | |
netstuff.pm25 = aqdata["pm25 standard"] | |
# Onboard temp/humidity sensor | |
temp, humidity = ( | |
funhouse.peripherals.temperature, | |
funhouse.peripherals.relative_humidity, | |
) | |
humidity = apply_temp_compensation_to_relative_humidity(temp, humidity, TEMP_ADJUST) | |
temp = temp + TEMP_ADJUST | |
temp_source = "AHT" | |
if reset_countdown is not None: | |
reset_countdown -= 1 | |
if reset_countdown <= 0: | |
microcontroller.reset() | |
# CO2 sensor | |
if scd is not None: | |
try: | |
if scd.data_available: | |
if not funhouse.peripherals.captouch6: | |
# captouch 6 is for forcing display of onboard funhouse aht temp/humidity | |
temp, humidity = scd.temperature, scd.relative_humidity | |
temp_source = "SCD" | |
last_co2 = "CO2: {:.1f} PPM".format(scd.CO2) | |
netstuff.co2 = scd.CO2 | |
max_co2 = max(max_co2, scd.CO2) | |
if funhouse.peripherals.button_sel: | |
data_display[2].text = "Max CO2: {:.1f} PPM".format(max_co2) | |
else: | |
data_display[2].text = last_co2 | |
# Handle recalibration | |
if funhouse.peripherals.button_sel: | |
recalibrate() | |
except ValueError: | |
print("oops") | |
scd = None | |
data_display[2].text = "SCD crashed!" | |
reset_countdown = 10 | |
# Let's display dewpoint instead of less-useful RH% and temp | |
dew_point = compute_dew_point(temp, humidity) | |
# data_display[1].text = "{:.1f}C, {:.1f}%RH".format(temp, humidity) | |
data_display[1].text = "Dew Pt {:.1f}F ({})".format(c_to_f(dew_point), temp_source) | |
netstuff.poll() | |
# Handle display power and sleep | |
if show_display: | |
funhouse.display.brightness = 0.5 | |
data_display.show() | |
else: | |
funhouse.display.brightness = 0 | |
for _ in range(50): | |
time.sleep(0.1) | |
show_display = screen.poll() | |
if show_display: | |
funhouse.display.brightness = 0.5 | |
data_display.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2021, Ryan Pavlik <[email protected]> | |
# SPDX-License-Identifier: Unlicense | |
import time | |
import board | |
from adafruit_clue import clue | |
from adafruit_pm25.i2c import PM25_I2C | |
from adafruit_scd30 import SCD30 | |
from pmhelper import PMDATA_ROWS | |
i2c = board.I2C() | |
pm25 = PM25_I2C(i2c) | |
try: | |
scd = SCD30(i2c) | |
have_scd = True | |
scd.measurement_interval = 5 | |
except ValueError: | |
have_scd = False | |
def c_to_f(temp): | |
return temp * 1.8 + 32 | |
clue_data = clue.simple_text_display(text_scale=2) | |
def ljust(s, n): | |
return s + " " * (n-len(s)) | |
if have_scd: | |
# Tell it our ambient pressure on startup for best data | |
scd.ambient_pressure = clue.pressure | |
max_co2 = 0 | |
last_co2 = "No CO2 meas" | |
class AutoOffScreen: | |
def __init__(self, duration=10, initial_duration=60) -> None: | |
self.turn_off = None | |
self.duration = duration | |
self.set_turn_off(time.monotonic() + initial_duration) | |
def set_turn_off(self, off_time): | |
self.on = True | |
if self.turn_off: | |
self.turn_off = max(self.turn_off, off_time) | |
else: | |
self.turn_off = off_time | |
def poll(self): | |
now = time.monotonic() | |
if clue.proximity > 3: | |
self.set_turn_off(now + self.duration) | |
return True | |
if self.on and now >= self.turn_off: | |
self.on = False | |
self.turn_off = None | |
return self.on | |
screen = AutoOffScreen() | |
while True: | |
# print(clue.proximity) | |
show_display = screen.poll() | |
clue_data[0].text = "Pressure: {:.3f}hPa".format(clue.pressure) | |
try: | |
aqdata = pm25.read() | |
except RuntimeError: | |
print("Unable to read from sensor, retrying...") | |
continue | |
for i, row_details in enumerate(PMDATA_ROWS): | |
row_text = "{}: {}".format(row_details.raw_label, aqdata[row_details.raw_key]) | |
if row_details.pm_label: | |
pm_text = "{}: {}".format(row_details.pm_label, aqdata[row_details.pm_key]) | |
row_text = ljust(row_text, 10) + pm_text | |
row = 3 + i | |
clue_data[row].text = row_text | |
if have_scd and scd.data_available: | |
clue_data[1].text = "{:.1f}F, {:.1f}%RH".format(c_to_f(scd.temperature), scd.relative_humidity) | |
last_co2 = "CO2: {:.1f} PPM".format(scd.CO2) | |
max_co2 = max(max_co2, scd.CO2) | |
if clue.button_a: | |
clue_data[2].text = "Max CO2: {:.1f} PPM".format(max_co2) | |
else: | |
clue_data[2].text = last_co2 | |
if show_display: | |
clue.display.brightness = 0.5 | |
clue_data.show() | |
else: | |
clue.display.brightness = 0 | |
for _ in range(50): | |
time.sleep(0.1) | |
show_display = screen.poll() | |
if show_display: | |
clue.display.brightness = 0.5 | |
clue_data.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2021 Ryan Pavlik <[email protected]> | |
# | |
# SPDX-License-Identifier: MIT | |
import math | |
def compute_dew_point(temperature, relative_humidity): | |
T = temperature | |
RH = relative_humidity | |
return ( | |
243.04 | |
* (math.log(RH / 100.0) + ((17.625 * T) / (243.04 + T))) | |
/ (17.625 - math.log(RH / 100.0) - ((17.625 * T) / (243.04 + T))) | |
) | |
def compute_relative_humidity(temperature, dew_point): | |
TD = dew_point | |
T = temperature | |
return 100 * ( | |
math.exp((17.625 * TD) / (243.04 + TD)) / math.exp((17.625 * T) / (243.04 + T)) | |
) | |
def apply_temp_compensation_to_relative_humidity( | |
temperature, relative_humidity, temp_adjust | |
): | |
dew_point = compute_dew_point(temperature, relative_humidity) | |
return compute_relative_humidity(temperature + temp_adjust, dew_point) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries | |
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries | |
# SPDX-FileCopyrightText: 2021 Ryan Pavlik <[email protected]> | |
# | |
# SPDX-License-Identifier: MIT | |
# | |
import time | |
# pylint: disable=unused-argument | |
def connected(client): | |
print("Connected to Adafruit IO! Subscribing...") | |
client.subscribe("buzzer") | |
client.subscribe("neopixels") | |
def subscribe(client, userdata, topic, granted_qos): | |
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) | |
def disconnected(client): | |
print("Disconnected from Adafruit IO!") | |
class NetStuff: | |
def __init__(self, funhouse): | |
self.funhouse = funhouse | |
self.last_pir = None | |
# Initialize a new MQTT Client object | |
self.funhouse.network.init_io_mqtt() | |
self.funhouse.network.on_mqtt_connect = connected | |
self.funhouse.network.on_mqtt_disconnect = disconnected | |
self.funhouse.network.on_mqtt_subscribe = subscribe | |
self.funhouse.network.on_mqtt_message = self.message | |
print("Connecting to Adafruit IO...") | |
self.funhouse.network.mqtt_connect() | |
self.sensorwrite_timestamp = time.monotonic() | |
self.temp = None | |
self.humidity = None | |
self.pressure = None | |
self.co2 = None | |
def message(self, client, feed_id, payload): | |
print("Feed {0} received new value: {1}".format(feed_id, payload)) | |
if feed_id == "buzzer": | |
if int(payload) == 1: | |
self.funhouse.peripherals.play_tone(2000, 0.25) | |
if feed_id == "neopixels": | |
print(payload) | |
color = int(payload[1:], 16) | |
self.funhouse.peripherals.dotstars.fill(color) | |
def poll(self): | |
self.funhouse.network.mqtt_loop() | |
# every 10 seconds, write temp/hum/press | |
now = time.monotonic() | |
if (now - self.sensorwrite_timestamp) > 10: | |
self.funhouse.peripherals.led = True | |
print("Sending data to adafruit IO!") | |
if self.temp: | |
self.funhouse.network.mqtt_publish("temperature", self.temp) | |
if self.humidity: | |
self.funhouse.network.mqtt_publish("humidity", int(self.humidity)) | |
if self.pressure: | |
self.funhouse.network.mqtt_publish("pressure", self.pressure) | |
if self.co2: | |
self.funhouse.network.mqtt_publish("co2", int(self.co2)) | |
self.sensorwrite_timestamp = now | |
# Send PIR only if changed! | |
if ( | |
self.last_pir is None | |
or self.last_pir != self.funhouse.peripherals.pir_sensor | |
): | |
last_pir = self.funhouse.peripherals.pir_sensor | |
self.funhouse.network.mqtt_publish("pir", "%d" % last_pir) | |
self.funhouse.peripherals.led = False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2021, Ryan Pavlik <[email protected]> | |
# SPDX-License-Identifier: Unlicense | |
from collections import namedtuple | |
_PMData = namedtuple("PMData", ["raw_label", "raw_key", "pm_label", "pm_key"]) | |
_avail_pm_ratings = set(("PM1.0", "PM2.5", "PM10.0")) | |
PMDATA_ROWS = [] | |
for size in ["0.3", "0.5", "1.0", "2.5", "5.0", "10.0"]: | |
# We have raw data for all sizes | |
raw_label = "{}um".format(size) | |
raw_key = "particles {}".format(raw_label.replace(".", "")) | |
# We don't have a "PM" value for all sizes | |
pm_label = "PM{}".format(size) | |
pm_key = None | |
if pm_label in _avail_pm_ratings: | |
pm_key = "{} standard".format(pm_label.replace(".", "").lower()) | |
else: | |
# Nothing for this size, so wipe out the label | |
pm_label = None | |
PMDATA_ROWS.append(_PMData(raw_label, raw_key, pm_label, pm_key)) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2020 Kattni Rembor for Adafruit Industries | |
# SPDX-FileCopyrightText: 2021 Ryan Pavlik <[email protected]> | |
# | |
# SPDX-License-Identifier: MIT | |
import board | |
class SimpleTextDisplay: | |
"""Easily display lines of text on a displayio display.""" | |
# Color variables available for import. | |
RED = (255, 0, 0) | |
YELLOW = (255, 255, 0) | |
ORANGE = (255, 150, 0) | |
GREEN = (0, 255, 0) | |
TEAL = (0, 255, 120) | |
CYAN = (0, 255, 255) | |
BLUE = (0, 0, 255) | |
PURPLE = (180, 0, 255) | |
MAGENTA = (255, 0, 150) | |
WHITE = (255, 255, 255) | |
BLACK = (0, 0, 0) | |
GOLD = (255, 222, 30) | |
PINK = (242, 90, 255) | |
AQUA = (50, 255, 255) | |
JADE = (0, 255, 40) | |
AMBER = (255, 100, 0) | |
VIOLET = (255, 0, 255) | |
SKY = (0, 180, 255) | |
RAINBOW = (RED, ORANGE, YELLOW, GREEN, BLUE, PURPLE) | |
def __init__( # pylint: disable=too-many-arguments | |
self, | |
title=None, | |
title_color=0xFFFFFF, | |
title_scale=1, | |
text_scale=1, | |
font=None, | |
colors=None, | |
display=None, | |
): | |
"""Display lines of text on a displayio display. Lines of text are created in order as shown | |
in the example below. If you skip a number, the line will be shown blank on the display, | |
e.g. if you include ``[0]`` and ``[2]``, the second line on the display will be empty, and | |
the text specified for lines 0 and 2 will be displayed on the first and third line. | |
Remember, Python begins counting at 0, so the first line on the display is 0 in the code. | |
Setup occurs before the loop. For data to be dynamically updated on the display, you must | |
include the data call in the loop by using ``.text =``. For example, if setup is saved as | |
``clue_data = simple_text_display()`` then ``clue_data[0].text = clue.proximity`` must be | |
inside the ``while True:`` loop for the proximity data displayed to update as the | |
values change. You must call ``show()`` at the end of the list for anything to display. | |
See example below for usage. | |
:param str title: The title displayed above the data. Set ``title="Title text"`` to provide | |
a title. Defaults to None. | |
:param title_color: The color of the title. Not necessary if no title is provided. Defaults | |
to white (255, 255, 255). | |
:param int title_scale: Scale the size of the title. Not necessary if no title is provided. | |
Defaults to 1. | |
:param int text_scale: Scale the size of the data lines. Scales the title as well. | |
Defaults to 1. | |
:param str font: The font to use to display the title and data. Defaults to built in | |
``terminalio.FONT``. | |
:param colors: A list of colors for the lines of data on the display. If you provide a | |
single color, all lines will be that color. Otherwise it will cycle through | |
the list you provide if the list is less than the number of lines displayed. | |
Default colors are used if ``colors`` is not set. For example, if creating | |
two lines of data, ``colors=((255, 255, 255), (255, 0, 0))`` would set the | |
first line white and the second line red, and if you created four lines of | |
data with the same setup, it would alternate white and red. | |
:param display: The display to use. If not specified, will try to use the built-in one via | |
``board`` | |
.. image :: ../docs/_static/display_funhouse_data.jpg | |
:alt: Display Data demo | |
This example displays three lines with acceleration, gyro and magnetic data on the display. | |
Remember to call ``show()`` after the list to update the display. | |
.. code-block:: python | |
from adafruit_funhouse import FunHouse | |
from simple_text_display import SimpleTextDisplay | |
funhouse = FunHouse() | |
sensor_data = SimpleTextDisplay(title="Sensor Data!", title_scale=2) | |
while True: | |
sensor_data[0].text = "Temperature: {:.2f} degrees C".format( | |
funhouse.peripherals.temperature | |
) | |
sensor_data[1].text = "Humidity: {:.2f}% RH".format( | |
funhouse.peripherals.relative_humidity | |
) | |
sensor_data[2].text = "Pressure: {:.2f} hPa".format(funhouse.peripherals.pressure) | |
sensor_data.show() | |
""" | |
# pylint: disable=import-outside-toplevel | |
import displayio | |
import terminalio | |
from adafruit_display_text import label | |
# pylint: enable=import-outside-toplevel | |
if not colors: | |
colors = ( | |
SimpleTextDisplay.VIOLET, | |
SimpleTextDisplay.GREEN, | |
SimpleTextDisplay.RED, | |
SimpleTextDisplay.CYAN, | |
SimpleTextDisplay.ORANGE, | |
SimpleTextDisplay.BLUE, | |
SimpleTextDisplay.MAGENTA, | |
SimpleTextDisplay.SKY, | |
SimpleTextDisplay.YELLOW, | |
SimpleTextDisplay.PURPLE, | |
) | |
self._colors = colors | |
self._label = label | |
if display is None: | |
display = board.DISPLAY | |
self._display = display | |
self._font = terminalio.FONT | |
if font: | |
self._font = font | |
self.text_group = displayio.Group(max_size=20, scale=text_scale) | |
if title: | |
# Fail gracefully if title is longer than 60 characters. | |
if len(title) > 60: | |
raise ValueError("Title must be 60 characters or less.") | |
title = label.Label( | |
self._font, | |
text=title, | |
max_glyphs=60, | |
color=title_color, | |
scale=title_scale, | |
) | |
title.x = 0 | |
title.y = 8 | |
self._y = title.y + 18 | |
self.text_group.append(title) | |
else: | |
self._y = 3 | |
self._lines = [] | |
for num in range(1): | |
self._lines.append(self.add_text_line(color=colors[num % len(colors)])) | |
def __getitem__(self, item): | |
"""Fetch the Nth text line Group""" | |
if len(self._lines) - 1 < item: | |
for _ in range(item - (len(self._lines) - 1)): | |
self._lines.append( | |
self.add_text_line(color=self._colors[item % len(self._colors)]) | |
) | |
return self._lines[item] | |
def add_text_line(self, color=0xFFFFFF): | |
"""Adds a line on the display of the specified color and returns the label object.""" | |
text_label = self._label.Label(self._font, text="", max_glyphs=45, color=color) | |
text_label.x = 0 | |
text_label.y = self._y | |
self._y = text_label.y + 13 | |
self.text_group.append(text_label) | |
return text_label | |
def show(self): | |
"""Call show() to display the data list.""" | |
self._display.show(self.text_group) | |
def show_terminal(self): | |
"""Revert to terminalio screen.""" | |
self._display.show(None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment