Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active October 30, 2024 16:00
Show Gist options
  • Save anecdata/3fb29aec1279318727e1df9857a010d3 to your computer and use it in GitHub Desktop.
Save anecdata/3fb29aec1279318727e1df9857a010d3 to your computer and use it in GitHub Desktop.
CircuitPython asyncio HTTP server
# SPDX-FileCopyrightText: 2023 anecdata
#
# SPDX-License-Identifier: MIT
import time
import traceback
import asyncio
import board
import digitalio
import wifi
import socketpool
from adafruit_httpserver.server import HTTPServer
from adafruit_httpserver.response import HTTPResponse
import neopixel
try:
import espidf
except Exception as e:
traceback.print_exception(e, e, e.__traceback__)
from secrets import secrets
PORT = 8080
ROOT_DIR = "/"
def connect():
print("Connecting... ", end="")
connect_errors = 0
while True:
try:
print(f"Connected?", end=" ")
wifi.radio.connect(secrets["ssid"], secrets["password"])
print(f"{wifi.radio.ipv4_address}")
break
except ConnectionError as e:
print(f"{e} IPv4={wifi.radio.ipv4_address}")
connect_errors += 1
if connect_errors > 8:
print(f"🔴 Too many consecutive connect failures. Reloading...")
supervisor.reload()
time.sleep(5) # wait for manual serial connect
led = None
if hasattr(board, "LED"):
led = digitalio.DigitalInOut(board.LED)
if led:
led.switch_to_output(False)
pix = []
if hasattr(board, "NEOPIXEL"):
pix = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.25, pixel_order=neopixel.GRB)
if pix:
if pix: pix[0] = 0
connect()
pool = socketpool.SocketPool(wifi.radio)
print(f"Initializing Server...", end=" ")
server = HTTPServer(pool)
print(f"Done.")
@server.route("/")
def base(request):
print(f'\nServing route("/")...')
return HTTPResponse(filename=f"{ROOT_DIR}boot_out.txt")
@server.route("/test")
def base(request):
print(f'\nServing route("/test")...')
r = "\n\n".join((f"{request.method} {request.path} {request.http_version}",
f"raw_request: {request.raw_request}",
f"header_body_bytes: {request.header_body_bytes}",
f"headers: {request.headers}",
f"body: {request.body}",
f"query_params: {request.query_params}",
))
return HTTPResponse(body=r)
async def poll(interval):
print(f"Starting Server...")
server.start(str(wifi.radio.ipv4_address), port=PORT, root_path=ROOT_DIR)
while True:
try:
if not wifi.radio.ipv4_address:
if pix: pix[0] = (0, 255, 128)
print(f"\nReconnecting...")
connect()
if led: led.value = True
if pix: pix[0] = (0, 0, 255)
server.poll()
if led: led.value = False
if pix: pix[0] = (0, 0,0)
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
continue
await asyncio.sleep(interval)
async def listening(interval):
print(f"Starting Status...")
loading = (
"• ",
" • ",
" • ",
" • ",
" •",
)
loading_index = 0
while True:
loading_index = (loading_index + 1) % len(loading)
print(f"{loading[loading_index]} Listening on http://{wifi.radio.ipv4_address}:{PORT}", end="\r")
await asyncio.sleep(interval)
async def main():
print(f"Starting Main...")
print(f"Starting poll_task...")
poll_task = asyncio.create_task(poll(0))
print(f"Starting listening_task...")
listening_task = asyncio.create_task(listening(1))
print(f"Gathering tasks...")
await asyncio.gather(poll_task, listening_task,)
asyncio.run(main())
@anecdata
Copy link
Author

anecdata commented Dec 15, 2022

Updated for adafruit_httpserver v1.0.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment