Created
          May 12, 2022 03:03 
        
      - 
      
- 
        Save shapr/201a7c5eae4bba78a00905da58416226 to your computer and use it in GitHub Desktop. 
    add parameter parsing to adafruit httpserver
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | # SPDX-FileCopyrightText: Copyright (c) 2022 Dan Halbert for Adafruit Industries | |
| # | |
| # SPDX-License-Identifier: MIT | |
| """ | |
| `adafruit_httpserver` | |
| ================================================================================ | |
| Simple HTTP Server for CircuitPython | |
| * Author(s): Dan Halbert | |
| Implementation Notes | |
| -------------------- | |
| **Software and Dependencies:** | |
| * Adafruit CircuitPython firmware for the supported boards: | |
| https://github.com/adafruit/circuitpython/releases | |
| """ | |
| try: | |
| from typing import Any, Callable, Optional | |
| except ImportError: | |
| pass | |
| from errno import EAGAIN, ECONNRESET | |
| import os | |
| import re | |
| __version__ = "0.0.0-auto.0" | |
| __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_HTTPServer.git" | |
| class HTTPStatus: # pylint: disable=too-few-public-methods | |
| """HTTP status codes.""" | |
| def __init__(self, value, phrase): | |
| """Define a status code. | |
| :param int value: Numeric value: 200, 404, etc. | |
| :param str phrase: Short phrase: "OK", "Not Found', etc. | |
| """ | |
| self.value = value | |
| self.phrase = phrase | |
| def __repr__(self): | |
| return f'HTTPStatus({self.value}, "{self.phrase}")' | |
| def __str__(self): | |
| return f"{self.value} {self.phrase}" | |
| HTTPStatus.NOT_FOUND = HTTPStatus(404, "Not Found") | |
| """404 Not Found""" | |
| HTTPStatus.OK = HTTPStatus(200, "OK") # pylint: disable=invalid-name | |
| """200 OK""" | |
| HTTPStatus.INTERNAL_SERVER_ERROR = HTTPStatus(500, "Internal Server Error") | |
| """500 Internal Server Error""" | |
| class _HTTPRequest: | |
| _REQUEST_RE = re.compile(r"(\S+)\s+(\S+)\s") | |
| def __init__( | |
| self, path: str = "", method: str = "", raw_request: bytes = None, params: str = "" | |
| ) -> None: | |
| self.params = "" | |
| if raw_request is None: | |
| self.path = path | |
| self.method = method | |
| else: | |
| # Parse request data from raw request | |
| match = self._REQUEST_RE.match(raw_request.decode("utf8")) | |
| if match: | |
| target = match.group(2).split("?") | |
| self.path = target[0] | |
| if len(target) > 1: | |
| self.params = target[1] | |
| ps = self.params.split('&') | |
| dict_params = {} | |
| for p in ps: | |
| equals_index = p.find('=') | |
| # k,v = p.split('=') # this doesn't handle url?param=base64= | |
| k,v = (p[:equals_index],p[equals_index+1:]) # no maxsplit, argh! | |
| dict_params[k] = v | |
| self.params = dict_params | |
| self.method = match.group(1) | |
| else: | |
| raise ValueError("Unparseable raw_request:", raw_request) | |
| def __hash__(self) -> int: | |
| return hash(self.method) ^ hash(self.path) | |
| def __eq__(self, other: "_HTTPRequest") -> bool: | |
| return self.method == other.method and self.path == other.path | |
| def __repr__(self) -> str: | |
| return f"_HTTPRequest(path={repr(self.path)}, method={repr(self.method)}, params={repr(self.params)})" | |
| class MIMEType: | |
| """Common MIME types. | |
| From https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types | |
| """ | |
| TEXT_PLAIN = "text/plain" | |
| _MIME_TYPES = { | |
| "aac": "audio/aac", | |
| "abw": "application/x-abiword", | |
| "arc": "application/x-freearc", | |
| "avi": "video/x-msvideo", | |
| "azw": "application/vnd.amazon.ebook", | |
| "bin": "application/octet-stream", | |
| "bmp": "image/bmp", | |
| "bz": "application/x-bzip", | |
| "bz2": "application/x-bzip2", | |
| "csh": "application/x-csh", | |
| "css": "text/css", | |
| "csv": "text/csv", | |
| "doc": "application/msword", | |
| "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| "eot": "application/vnd.ms-fontobject", | |
| "epub": "application/epub+zip", | |
| "gz": "application/gzip", | |
| "gif": "image/gif", | |
| "html": "text/html", | |
| "htm": "text/html", | |
| "ico": "image/vnd.microsoft.icon", | |
| "ics": "text/calendar", | |
| "jar": "application/java-archive", | |
| "jpeg .jpg": "image/jpeg", | |
| "js": "text/javascript", | |
| "json": "application/json", | |
| "jsonld": "application/ld+json", | |
| "mid": "audio/midi", | |
| "midi": "audio/midi", | |
| "mjs": "text/javascript", | |
| "mp3": "audio/mpeg", | |
| "cda": "application/x-cdf", | |
| "mp4": "video/mp4", | |
| "mpeg": "video/mpeg", | |
| "mpkg": "application/vnd.apple.installer+xml", | |
| "odp": "application/vnd.oasis.opendocument.presentation", | |
| "ods": "application/vnd.oasis.opendocument.spreadsheet", | |
| "odt": "application/vnd.oasis.opendocument.text", | |
| "oga": "audio/ogg", | |
| "ogv": "video/ogg", | |
| "ogx": "application/ogg", | |
| "opus": "audio/opus", | |
| "otf": "font/otf", | |
| "png": "image/png", | |
| "pdf": "application/pdf", | |
| "php": "application/x-httpd-php", | |
| "ppt": "application/vnd.ms-powerpoint", | |
| "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", | |
| "rar": "application/vnd.rar", | |
| "rtf": "application/rtf", | |
| "sh": "application/x-sh", | |
| "svg": "image/svg+xml", | |
| "swf": "application/x-shockwave-flash", | |
| "tar": "application/x-tar", | |
| "tiff": "image/tiff", | |
| "tif": "image/tiff", | |
| "ts": "video/mp2t", | |
| "ttf": "font/ttf", | |
| "txt": TEXT_PLAIN, | |
| "vsd": "application/vnd.visio", | |
| "wav": "audio/wav", | |
| "weba": "audio/webm", | |
| "webm": "video/webm", | |
| "webp": "image/webp", | |
| "woff": "font/woff", | |
| "woff2": "font/woff2", | |
| "xhtml": "application/xhtml+xml", | |
| "xls": "application/vnd.ms-excel", | |
| "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| "xml": "application/xml", | |
| "xul": "application/vnd.mozilla.xul+xml", | |
| "zip": "application/zip", | |
| "7z": "application/x-7z-compressed", | |
| } | |
| @staticmethod | |
| def mime_type(filename): | |
| """Return the mime type for the given filename. If not known, return "text/plain".""" | |
| return MIMEType._MIME_TYPES.get(filename.split(".")[-1], MIMEType.TEXT_PLAIN) | |
| class HTTPResponse: | |
| """Details of an HTTP response. Use in @`HTTPServer.route` decorator functions.""" | |
| _HEADERS_FORMAT = ( | |
| "HTTP/1.1 {}\r\n" | |
| "Content-Type: {}\r\n" | |
| "Content-Length: {}\r\n" | |
| "Connection: close\r\n" | |
| "\r\n" | |
| ) | |
| def __init__( | |
| self, | |
| *, | |
| status: tuple = HTTPStatus.OK, | |
| content_type: str = MIMEType.TEXT_PLAIN, | |
| body: str = "", | |
| filename: Optional[str] = None, | |
| root: str = "", | |
| ) -> None: | |
| """Create an HTTP response. | |
| :param tuple status: The HTTP status code to return, as a tuple of (int, "message"). | |
| Common statuses are available in `HTTPStatus`. | |
| :param str content_type: The MIME type of the data being returned. | |
| Common MIME types are available in `MIMEType`. | |
| :param Union[str|bytes] body: | |
| The data to return in the response body, if ``filename`` is not ``None``. | |
| :param str filename: If not ``None``, | |
| return the contents of the specified file, and ignore ``body``. | |
| :param str root: root directory for filename, without a trailing slash | |
| """ | |
| self.status = status | |
| self.content_type = content_type | |
| self.body = body.encode() if isinstance(body, str) else body | |
| self.filename = filename | |
| self.root = root | |
| def send(self, conn: Any) -> None: | |
| # TODO: Use Union[SocketPool.Socket | socket.socket] for the type annotation in some way. | |
| """Send the constructed response over the given socket.""" | |
| if self.filename: | |
| try: | |
| file_length = os.stat(self.root + self.filename)[6] | |
| self._send_file_response(conn, self.filename, self.root, file_length) | |
| except OSError: | |
| self._send_response( | |
| conn, | |
| HTTPStatus.NOT_FOUND, | |
| MIMEType.TEXT_PLAIN, | |
| f"{HTTPStatus.NOT_FOUND} {self.filename}\r\n", | |
| ) | |
| else: | |
| self._send_response(conn, self.status, self.content_type, self.body) | |
| def _send_response(self, conn, status, content_type, body): | |
| self._send_bytes( | |
| conn, self._HEADERS_FORMAT.format(status, content_type, len(body)) | |
| ) | |
| self._send_bytes(conn, body) | |
| def _send_file_response(self, conn, filename, root, file_length): | |
| self._send_bytes( | |
| conn, | |
| self._HEADERS_FORMAT.format( | |
| self.status, MIMEType.mime_type(filename), file_length | |
| ), | |
| ) | |
| with open(root + filename, "rb") as file: | |
| while bytes_read := file.read(8192): | |
| self._send_bytes(conn, bytes_read) | |
| def _send_bytes(self, conn, buf): # pylint: disable=no-self-use | |
| bytes_sent = 0 | |
| bytes_to_send = len(buf) | |
| view = memoryview(buf) | |
| while bytes_sent < bytes_to_send: | |
| try: | |
| bytes_sent += conn.send(view[bytes_sent:]) | |
| except OSError as exc: | |
| if exc.errno == EAGAIN: | |
| continue | |
| if exc.errno == ECONNRESET: | |
| return | |
| class HTTPServer: | |
| """A basic socket-based HTTP server.""" | |
| def __init__(self, socket_source: Any) -> None: | |
| # TODO: Use a Protocol for the type annotation. | |
| # The Protocol could be refactored from adafruit_requests. | |
| """Create a server, and get it ready to run. | |
| :param socket: An object that is a source of sockets. This could be a `socketpool` | |
| in CircuitPython or the `socket` module in CPython. | |
| """ | |
| self._buffer = bytearray(1024) | |
| self.routes = {} | |
| self._socket_source = socket_source | |
| self._sock = None | |
| def route(self, path: str, method: str = "GET"): | |
| """Decorator used to add a route. | |
| :param str path: filename path | |
| :param str method: HTTP method: "GET", "POST", etc. | |
| Example:: | |
| @server.route(path, method) | |
| def route_func(request): | |
| return HTTPResponse(body="hello world") | |
| """ | |
| def route_decorator(func: Callable) -> Callable: | |
| self.routes[_HTTPRequest(path, method)] = func | |
| return func | |
| return route_decorator | |
| def serve_forever(self, host: str, port: int = 80, root: str = "") -> None: | |
| """Wait for HTTP requests at the given host and port. Does not return. | |
| :param str host: host name or IP address | |
| :param int port: port | |
| :param str root: root directory to serve files from | |
| """ | |
| self._sock = self._socket_source.socket( | |
| self._socket_source.AF_INET, self._socket_source.SOCK_STREAM | |
| ) | |
| self._sock.bind((host, port)) | |
| self._sock.listen(1) | |
| while True: | |
| try: | |
| conn, _ = self._sock.accept() | |
| except OSError: | |
| continue | |
| with conn: | |
| # If reading fails, close connection and retry. | |
| try: | |
| length, _ = conn.recvfrom_into(self._buffer) | |
| except OSError: | |
| continue | |
| request = _HTTPRequest(raw_request=self._buffer[:length]) | |
| # If a route exists for this request, call it. Otherwise try to serve a file. | |
| route = self.routes.get(request, None) | |
| if route: | |
| response = route(request) | |
| elif request.method == "GET": | |
| response = HTTPResponse(filename=request.path, root=root) | |
| else: | |
| response = HTTPResponse(status=HTTPStatus.INTERNAL_SERVER_ERROR) | |
| response.send(conn) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment