Last active
October 30, 2024 16:00
-
-
Save anecdata/3fb29aec1279318727e1df9857a010d3 to your computer and use it in GitHub Desktop.
CircuitPython asyncio HTTP server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # SPDX-FileCopyrightText: 2023 anecdata | |
| # | |
| # SPDX-License-Identifier: MIT | |
| import time | |
| import traceback | |
| import asyncio | |
| import board | |
| import digitalio | |
| import wifi | |
| import socketpool | |
| from adafruit_httpserver.server import HTTPServer | |
| from adafruit_httpserver.response import HTTPResponse | |
| import neopixel | |
| try: | |
| import espidf | |
| except Exception as e: | |
| traceback.print_exception(e, e, e.__traceback__) | |
| from secrets import secrets | |
| PORT = 8080 | |
| ROOT_DIR = "/" | |
| def connect(): | |
| print("Connecting... ", end="") | |
| connect_errors = 0 | |
| while True: | |
| try: | |
| print(f"Connected?", end=" ") | |
| wifi.radio.connect(secrets["ssid"], secrets["password"]) | |
| print(f"{wifi.radio.ipv4_address}") | |
| break | |
| except ConnectionError as e: | |
| print(f"{e} IPv4={wifi.radio.ipv4_address}") | |
| connect_errors += 1 | |
| if connect_errors > 8: | |
| print(f"🔴 Too many consecutive connect failures. Reloading...") | |
| supervisor.reload() | |
| time.sleep(5) # wait for manual serial connect | |
| led = None | |
| if hasattr(board, "LED"): | |
| led = digitalio.DigitalInOut(board.LED) | |
| if led: | |
| led.switch_to_output(False) | |
| pix = [] | |
| if hasattr(board, "NEOPIXEL"): | |
| pix = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.25, pixel_order=neopixel.GRB) | |
| if pix: | |
| if pix: pix[0] = 0 | |
| connect() | |
| pool = socketpool.SocketPool(wifi.radio) | |
| print(f"Initializing Server...", end=" ") | |
| server = HTTPServer(pool) | |
| print(f"Done.") | |
| @server.route("/") | |
| def base(request): | |
| print(f'\nServing route("/")...') | |
| return HTTPResponse(filename=f"{ROOT_DIR}boot_out.txt") | |
| @server.route("/test") | |
| def base(request): | |
| print(f'\nServing route("/test")...') | |
| r = "\n\n".join((f"{request.method} {request.path} {request.http_version}", | |
| f"raw_request: {request.raw_request}", | |
| f"header_body_bytes: {request.header_body_bytes}", | |
| f"headers: {request.headers}", | |
| f"body: {request.body}", | |
| f"query_params: {request.query_params}", | |
| )) | |
| return HTTPResponse(body=r) | |
| async def poll(interval): | |
| print(f"Starting Server...") | |
| server.start(str(wifi.radio.ipv4_address), port=PORT, root_path=ROOT_DIR) | |
| while True: | |
| try: | |
| if not wifi.radio.ipv4_address: | |
| if pix: pix[0] = (0, 255, 128) | |
| print(f"\nReconnecting...") | |
| connect() | |
| if led: led.value = True | |
| if pix: pix[0] = (0, 0, 255) | |
| server.poll() | |
| if led: led.value = False | |
| if pix: pix[0] = (0, 0,0) | |
| except Exception as ex: | |
| traceback.print_exception(ex, ex, ex.__traceback__) | |
| continue | |
| await asyncio.sleep(interval) | |
| async def listening(interval): | |
| print(f"Starting Status...") | |
| loading = ( | |
| "• ", | |
| " • ", | |
| " • ", | |
| " • ", | |
| " •", | |
| ) | |
| loading_index = 0 | |
| while True: | |
| loading_index = (loading_index + 1) % len(loading) | |
| print(f"{loading[loading_index]} Listening on http://{wifi.radio.ipv4_address}:{PORT}", end="\r") | |
| await asyncio.sleep(interval) | |
| async def main(): | |
| print(f"Starting Main...") | |
| print(f"Starting poll_task...") | |
| poll_task = asyncio.create_task(poll(0)) | |
| print(f"Starting listening_task...") | |
| listening_task = asyncio.create_task(listening(1)) | |
| print(f"Gathering tasks...") | |
| await asyncio.gather(poll_task, listening_task,) | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated for adafruit_httpserver v1.0.0.