Last active
July 8, 2022 16:12
-
-
Save efc/a53860ee297eaa8d7d1d2615d83ba9ce to your computer and use it in GitHub Desktop.
Controller for lock based on Adafruit ESP32-S2 Feather
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Lock via wsgi server on Adafruit Feather ESP32S2 | |
# SPDX-FileCopyrightText: 2021 Tenseg LLC | |
# SPDX-License-Identifier: MIT | |
# Save to CIRCUITPY as code.py | |
# Adafruit ESP32-S2 Feather pinouts... | |
# - (GND) to ground (and to LED resistor) | |
# - (A1) to red LED | |
# - (A2) to servo (TowerPro SG-5010) yellow data | |
# - (USB) to servo red power | |
# - (5) to reed switch | |
# - (6) to grey switch | |
# - (9) to white switch | |
# - (10) to pink switch | |
# - (11) to orange switch | |
# see https://learn.adafruit.com/adafruit-esp32-s2-feather/pinouts | |
import time | |
import ssl | |
import json | |
import microcontroller | |
import wifi | |
import socketpool | |
import adafruit_requests | |
import board | |
import pwmio | |
from adafruit_motor import servo | |
import wsgiserver as server | |
from adafruit_wsgi.wsgi_app import WSGIApp | |
import neopixel | |
import digitalio | |
print("Lock via wsgi server on ESP32S2") | |
# depends on secrets.py definitions | |
# NOTE: I have commented out the aio_user so that we | |
# effectively turn of AIO logging, which seems to have some | |
# sort of conflict with the HB listener. Not sure why? | |
try: | |
from secrets import secrets | |
except ImportError: | |
print( | |
""" | |
Please create a secrets.py file with content like this... | |
secrets = { | |
'ssid': 'SSID', | |
'password': 'PASSWORD', | |
'homebridge_server': 'http://127.0.0.1', | |
'homebridge_auth': 'Basic BASE64_ENCODED_AUTH', | |
'homebridge_listener_port': '8282', | |
'homebridge_buttons_port': '5000', | |
# 'aio_user': 'ADAFRUIT USERNAME', | |
'aio_feed': 'ADAFRUIT FEED KEY', | |
'aio_key': 'ADAFRUIT SECRET KEY' | |
} | |
""" | |
) | |
raise | |
########################################################### | |
# TIMING | |
########################################################### | |
# reboot every six hours | |
REBOOT_TIME = time.monotonic() + (60 * 60 * 6) | |
def check_for_restart(): | |
"""Checks the time passed and reboots at the appropriate time.""" | |
now = time.monotonic() | |
if now > REBOOT_TIME: | |
microcontroller.reset() | |
_autolock_delay = 60 * 5 # 5 minutes, may be overridden by /unlock | |
_autolock_timer_start = 0 # set each time we start an autolock timer | |
def set_autolock_delay(seconds=60 * 5): | |
"""Change the seconds we wait for an autolock to take effect.""" | |
global _autolock_delay | |
_autolock_delay = seconds | |
def set_autolock_timer(caller=""): | |
"""Start the autolock countdown.""" | |
global _autolock_timer_start | |
_autolock_timer_start = time.monotonic() | |
log(f"Autolock timer started at {int(_autolock_timer_start)} by {caller}") | |
set_goal('autolock', caller) | |
def autolock_delay_is_expired(): | |
"""Returns true when the autolock has expired.""" | |
now = time.monotonic() | |
return _autolock_timer_start < now - _autolock_delay | |
########################################################### | |
# GOALS | |
########################################################### | |
_goal = '' # only lock, autolock, and unlock are significant | |
def set_goal(target="", caller=""): | |
""" | |
Set the goal. | |
Only lock, autolock, and unlock are significant. | |
Any other value means there is no current goal. | |
""" | |
global _goal | |
_goal = target | |
log(f"Goal set to {target} by {caller}") | |
def get_goal(): | |
"""Return the value of the current goal.""" | |
global _goal | |
return _goal | |
def set_goal_toggle(caller=""): | |
""" | |
Toggle the goal between lock and unlock | |
based on the current state of the door lock servo. | |
Returns "locking" or "unlocking". | |
""" | |
if caller != "": | |
caller += " " | |
caller += "toggle" | |
if door_is_locked(): | |
set_goal("unlock", caller) | |
return "unlocking" | |
else: | |
set_goal("lock", caller) | |
return "locking" | |
def enact_goal(): | |
"""Take an action based on our goal as long as the door is closed.""" | |
goal = get_goal() | |
if 'lock' is goal or ('autolock' is goal and autolock_delay_is_expired()): | |
lock_door() | |
if 'unlock' is goal: | |
unlock_door() | |
########################################################### | |
# NETWORKING & WIFI | |
########################################################### | |
print("Connecting WiFi") | |
wifi.radio.connect(secrets['ssid'], secrets['password']) | |
HOST = repr(wifi.radio.ipv4_address) | |
PORT = 80 | |
pool = socketpool.SocketPool(wifi.radio) | |
requests = adafruit_requests.Session(pool, ssl.create_default_context()) | |
def tell_homebridge_listener(path, queries={}): | |
# Send a message to the homebridge listener and return the response | |
url = f"{secrets['homebridge_server']}:{secrets['homebridge_listener_port']}/{path}?" | |
for key, value in queries.items(): | |
url = f'{url}{key}="{value}"&' | |
url = url[:-1] # remove the final character from the URL | |
response_text = '' | |
try: | |
response = requests.get(url) | |
response_text = response.text | |
log(f"Sent {url} Response '{response_text}'") | |
except Exception as e: | |
log(f"While attempting to send {url}") | |
template = "Exception {0}:\n{1!r}" | |
message = template.format(type(e).__name__, e.args) | |
log(message) | |
return response_text | |
def token_is_valid(token): | |
# Check that the incomming token is the same as that stored by homebridge | |
response_text = tell_homebridge_listener('validate', {'token': token}) | |
return response_text == 'valid' | |
def unauthorized(caller=""): | |
log(f"Unauthorized {caller}") | |
return ("401 Unauthorized", [], "401 Unauthorized") | |
def homebridge_get(path): | |
# Pokes Homebridge API URLs | |
# note that homebridge_auth is already base64 encoded | |
try: | |
response = requests.get( | |
secrets['homebridge_server'] + ':' + | |
secrets['homebridge_buttons_port'] + '/' + path, | |
headers={'Authorization': secrets['homebridge_auth']} | |
) | |
return response | |
except Exception as e: | |
log(f'While attempting homebridge path {path}') | |
template = "Exception {0}:\n{1!r}" | |
message = template.format(type(e).__name__, e.args) | |
log(message) | |
return None | |
########################################################### | |
# LOGGING (NOTE: this is currently broken) | |
########################################################### | |
_log = "" # accumulates the text of this log entry | |
_last_log_time = 0 # facilitates accumulation of the log entries and delay of reporting | |
def log(message): | |
"""Add new line with this message to the log entry.""" | |
global _log, _last_log_time | |
print(message) | |
if _log: | |
_log += "\n" | |
_last_log_time = time.monotonic() | |
_log += message | |
def write_logs(): | |
""" | |
Write log entry after a brief delay. | |
The delay allows us to accumulate log messages | |
so that we make fewer calls to the logging API. | |
""" | |
global _log, _last_log_time | |
log_delay = time.monotonic() - 10 | |
if _log and log_delay > _last_log_time: | |
adafruit_io_log(_log) | |
_log = "" | |
def adafruit_io_log(message): | |
""" | |
Push a log entry to the Adafruit IO API. | |
NOTE: this is currently not working, it causes conflicts with the Homebridge API. | |
For now it has been disabled by commenting out the aio_user secret. | |
""" | |
if 'aio_user' in secrets and 'aio_feed' in secrets and 'aio_key' in secrets: | |
try: | |
response = requests.post( | |
url=f"https://io.adafruit.com/api/v2/{secrets['aio_user']}/feeds/{secrets['aio_feed']}/data", | |
headers={ | |
"Content-Type": "application/json", | |
"X-Aio-Key": secrets['aio_key'], | |
}, | |
data=json.dumps({ | |
"value": message, | |
}) | |
) | |
return response | |
except Exception as e: | |
print(f'While attempting AIO log') | |
template = "Exception {0}:\n{1!r}" | |
message = template.format(type(e).__name__, e.args) | |
print(message) | |
return None | |
########################################################### | |
# INDICATORS & SENSORS | |
########################################################### | |
# Set up LED | |
LED = digitalio.DigitalInOut(board.A1) | |
LED.direction = digitalio.Direction.OUTPUT | |
# Set up neopixel | |
PIXEL = neopixel.NeoPixel( | |
board.NEOPIXEL, 1, brightness=0.008 | |
) | |
BLACK = (0, 0, 0) | |
WHITE = (255, 255, 255) | |
LOCKED_COLOR = RED = (255, 0, 0) # locked | |
UNLOCKED_COLOR = GREEN = (0, 255, 0) # unlocked | |
OPENED_COLOR = BLUE = (0, 0, 255) # door just opened | |
ERROR_COLOR = YELLOW = (255, 255, 0) | |
ORANGE = (255, 150, 0) # door just closed | |
CLOSED_COLOR = PURPLE = (255, 0, 255) # error | |
# Set up the servo assuming signal on pin A2 | |
PWM = pwmio.PWMOut(board.A2, duty_cycle=2 ** 15, frequency=50) | |
MOTOR = servo.Servo(PWM) | |
LOCKED_ANGLE = 160 | |
UNLOCKED_ANGLE = 0 | |
SERVO_CHANGE_DELAY = 0.2 | |
class ToggelingPin: | |
"""Simplify setup of pins for simple toggling switch sensors.""" | |
def __init__(self, pin, name='', on_true=None, on_false=None, true_color=None, false_color=None): | |
"""Save our attributes to self and setup digital I/O.""" | |
self.pin = pin | |
self.name = name if name else f'{self.pin}' | |
self.on_true = on_true | |
self.on_false = on_false | |
self.true_color = true_color | |
self.false_color = false_color | |
self.sensor = digitalio.DigitalInOut(pin) | |
self.sensor.direction = digitalio.Direction.INPUT | |
self.sensor.pull = digitalio.Pull.UP | |
self.state = True | |
def check_state(self): | |
""" | |
Check and return the true/false state of the pin. | |
If the state has changed, inform our callback functions. | |
Note that true for these pins is an open circuit, false is closed. | |
""" | |
state = self.sensor.value | |
if state != self.state: | |
self.state = state | |
if state: | |
if self.true_color: | |
PIXEL.fill(self.true_color) | |
if self.on_true: | |
self.on_true(self) | |
else: | |
log(f"{time.monotonic():.1f}: {self.pin} true") | |
else: | |
if self.false_color: | |
PIXEL.fill(self.false_color) | |
if self.on_false: | |
self.on_false(self) | |
else: | |
log(f"{time.monotonic():.1f}: {self.pin} false") | |
return state | |
def value(self): | |
""" | |
Convenience function to return these current sensor value. | |
This reports the value without checking agains old state. | |
""" | |
return self.sensor.value | |
_sensors = {} # references to sensors, buttons, and their handlers | |
def setup_sensors(): | |
"""Adds entries for each sensor and button.""" | |
_sensors['door'] = ToggelingPin( | |
board.D5, | |
'Door', | |
door_closed, | |
door_opened | |
) | |
_sensors['white'] = ToggelingPin( | |
board.D9, | |
'White Key', | |
button_released, | |
white_button_pressed, | |
BLACK, | |
WHITE | |
) | |
_sensors['gray'] = ToggelingPin( | |
board.D6, | |
'Gray Key', | |
button_released, | |
gray_button_pressed, | |
BLACK, | |
BLUE | |
) | |
_sensors['pink'] = ToggelingPin( | |
board.D10, | |
'Pink Key', # red | |
button_released, | |
pink_button_pressed, | |
BLACK, | |
RED | |
) | |
_sensors['orange'] = ToggelingPin( | |
board.D11, | |
'Toggle Lock Key', | |
button_released, | |
orange_button_pressed, | |
BLACK, | |
ORANGE | |
) | |
def check_sensors(): | |
"""Called from the event loop.""" | |
for sensor in _sensors.values(): | |
sensor.check_state() | |
def button_pressed(pin): | |
"""Generic callback for a pressed button.""" | |
log(f"{time.monotonic():.1f}: {pin.name} pressed") | |
def orange_button_pressed(pin): | |
"""This is actually the green "toggle door" button.""" | |
button_pressed(pin) | |
set_goal_toggle("orange_button_pressed") | |
def pink_button_pressed(pin): | |
"""This is actually the red "unlock garage" button.""" | |
button_pressed(pin) | |
response = homebridge_get("?accessoryId=Kitchen.Pink&state=true") | |
def white_button_pressed(pin): | |
"""This is the white "garage light" button.""" | |
button_pressed(pin) | |
response = homebridge_get("?accessoryId=Kitchen.White&state=true") | |
def gray_button_pressed(pin): | |
"""This is the black "lock everything" button.""" | |
button_pressed(pin) | |
response = homebridge_get("?accessoryId=Kitchen.Gray&state=true") | |
def button_released(pin): | |
"""Generic callback for a relesed button.""" | |
log(f"{time.monotonic():.1f}: {pin.name} released") | |
def door_opened(pin): | |
"""The reed switch indicates the door was opened.""" | |
if door_is_locked(): | |
log(f"{time.monotonic():.1f}: {pin.name} error, door opened while locked") | |
else: | |
log(f"{time.monotonic():.1f}: {pin.name} opened") | |
def door_closed(pin): | |
""" | |
The reed switch indicates the door was closed. | |
Closing the door resets the autolock timer. | |
""" | |
set_autolock_timer('door_closed') | |
if door_is_locked(): | |
log(f"{time.monotonic():.1f}: {pin.name} error, door closed while locked") | |
else: | |
log(f"{time.monotonic():.1f}: {pin.name} closed") | |
def door_is_open(): | |
return not _sensors['door'].value() | |
def door_is_locked(): | |
return 600 > MOTOR.angle > LOCKED_ANGLE / 2 | |
########################################################### | |
# ACTIONS | |
########################################################### | |
def lock_door(): | |
"""Attempt to lock the door as long as it is closed.""" | |
if door_is_open(): | |
log(f"{time.monotonic():.1f}: error, locking not attempted while door open ({MOTOR.angle:.0f})") | |
tell_homebridge_listener("unlocked") | |
set_goal('none, tried to lock while door was open', 'lock_door') | |
else: | |
MOTOR.angle = LOCKED_ANGLE | |
time.sleep(SERVO_CHANGE_DELAY) | |
log(f"{time.monotonic():.1f}: Locking successful ({MOTOR.angle:.0f})") | |
tell_homebridge_listener("locked") | |
set_goal('none, successfully locked', 'lock_door') | |
def unlock_door(): | |
"""Attempt to unlock the door as long as it is closed.""" | |
if door_is_open(): | |
log(f"{time.monotonic():.1f}: error, unlocking not attempted while door open ({MOTOR.angle:.0f})") | |
tell_homebridge_listener("unlocked") | |
set_goal('none, tried to unlock while already open', 'unlock_door') | |
else: | |
MOTOR.angle = UNLOCKED_ANGLE | |
time.sleep(SERVO_CHANGE_DELAY) | |
log(f"{time.monotonic():.1f}: Unlocking successful ({MOTOR.angle:.0f})") | |
tell_homebridge_listener("unlocked") | |
# we set the goal to lock so that the autolock will happen | |
set_autolock_timer('unlock_door') | |
########################################################### | |
# WEB SERVER | |
########################################################### | |
WEB_APP = WSGIApp() # creates a web app to act as our server | |
print(f"try: http://{HOST}:{PORT}/lock") | |
@WEB_APP.route("/lock") | |
def lock_route(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
set_goal('lock', 'api/lock') | |
return ("200 OK", [], "Securing lock") | |
else: | |
return unauthorized('/lock') | |
print(f"try: http://{HOST}:{PORT}/unlock") | |
@WEB_APP.route("/unlock") | |
def unlock_route(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
try: | |
set_autolock_delay(int(request.query_params.get("auto"))) | |
except Exception: | |
pass # it is ok for there to be no auto query | |
set_goal('unlock', 'api/unlock') | |
return ("200 OK", [], "Unsecuring lock") | |
else: | |
return unauthorized('/unlock') | |
print(f"try: http://{HOST}:{PORT}/toggle") | |
@WEB_APP.route("/toggle") | |
def toggle_route(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
set_goal_toggle("api/toggle") | |
return ("200 OK", [], f"{'Securing' if ('lock' is get_goal()) else 'Unsecuring'} lock") | |
else: | |
return unauthorized('/toggle') | |
print(f"try: http://{HOST}:{PORT}/currentstatus") | |
@WEB_APP.route("/currentstatus") | |
def servo_status(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
status = "unlocked" | |
if door_is_locked(): | |
status = "locked" | |
return ("200 OK", [], status) | |
else: | |
return unauthorized('/currentstatus') | |
print(f"try: http://{HOST}:{PORT}/currentgoal") | |
@WEB_APP.route("/currentgoal") | |
def servo_goal(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
return ("200 OK", [], get_goal()) | |
else: | |
return unauthorized('/currentgoal') | |
print(f"try: http://{HOST}:{PORT}/status") | |
@WEB_APP.route("/status") | |
def weblock_status(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
current = 1 if door_is_locked() else 0 | |
target = current | |
if 'lock' is get_goal(): | |
target = 1 | |
if 'unlock' is get_goal(): | |
target = 0 | |
json = f'{{"target": {target}, "current": {current}}}' | |
return ("200 OK", [], json) | |
else: | |
return unauthorized('/status') | |
print(f"try: http://{HOST}:{PORT}/angle") | |
@WEB_APP.route("/angle") | |
def servo_angle(request): # pylint: disable=unused-argument | |
if token_is_valid(request.query_params.get("token")): | |
return ("200 OK", [], f'{MOTOR.angle:.0f}') | |
else: | |
return unauthorized('/angle') | |
# Set up our server, passing in our web_app as the application | |
# using the wsgi server made for the ESP32 | |
WEB_SERVER = server.WSGIServer(80, application=WEB_APP) | |
log(f"Listening at: http://{HOST}:{PORT}/") | |
########################################################### | |
# EVENT LOOP | |
########################################################### | |
log("Lock Restarting") | |
set_goal('lock', 'startup') | |
# We start with a locked door, just in case | |
setup_sensors() | |
# Start the web server | |
WEB_SERVER.start() | |
while True: | |
# our main loop where we have the server poll for incoming requests | |
WEB_SERVER.update_poll() | |
# check the state of our sensors | |
check_sensors() | |
# take action based on the goals set by API and sensors | |
enact_goal() | |
# write stored logs | |
write_logs() | |
# flash the LED if the door is locked | |
if door_is_locked(): | |
LED.value = int(time.monotonic()) % 4 is not 0 | |
PIXEL.fill(LOCKED_COLOR) | |
else: | |
LED.value = False | |
if door_is_open(): | |
PIXEL.fill(OPENED_COLOR) | |
else: | |
PIXEL.fill(UNLOCKED_COLOR) | |
# reboot every now and then | |
check_for_restart() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See http://eric.clst.org/tech/smoothlock/ for an example of how this code was used.