Created
September 3, 2022 11:52
-
-
Save nitori/e855e41893778906289825cabcc33dde to your computer and use it in GitHub Desktop.
Quick and dirty ~1-hour HTTP server written in python, only using socket, os and threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import threading | |
| import socket | |
| import os | |
| DOCROOT = os.path.dirname(os.path.abspath(__file__)) | |
| EXT_MIME_MAP = { | |
| '.html': 'text/html; charset=utf-8', | |
| '.css': 'text/css; charset=utf-8', | |
| '.txt': 'text/plain; charset=utf-8', | |
| '.csv': 'text/csv; charset=utf-8', | |
| '.py': 'text/plain; charset=utf-8', | |
| '.js': 'application/javascript; charset=utf-8', | |
| '.png': 'image/png', | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.gif': 'image/gif', | |
| '.svg': 'image/svg+xml', | |
| '.ico': 'image/x-icon', | |
| '.json': 'application/json; charset=utf-8', | |
| } | |
| def parse_header(header: bytes): | |
| header = header.decode('latin1') | |
| lines = header.split('\r\n') | |
| headers = {} | |
| method = path = None | |
| for i, line in enumerate(lines): | |
| if i == 0: | |
| method, path, _ = line.split(' ') | |
| else: | |
| key, value = line.split(':', 1) | |
| headers[key.lower().strip()] = urldecode(value.strip()) | |
| return method, path, headers | |
| def client_handler(stop_event: threading.Event, sock: socket.socket, addr): | |
| buf = b'' | |
| method = path = headers = None | |
| while not stop_event.is_set(): | |
| data = sock.recv(1024) | |
| if not data: | |
| break | |
| buf += data | |
| if headers is None: | |
| if b'\r\n\r\n' in buf: | |
| header, buf = buf.split(b'\r\n\r\n', 1) | |
| method, path, headers = parse_header(header) | |
| if 'content-length' not in headers: | |
| handle_request(sock, method, path, headers) | |
| headers = None | |
| else: | |
| content_length = int(headers['content-length']) | |
| if len(buf) >= content_length: | |
| body, buf = buf[:content_length], buf[content_length:] | |
| handle_request(sock, method, path, headers, body) | |
| headers = None | |
| def handle_request(sock: socket.socket, method, path, headers, body=None): | |
| path = urldecode(path) | |
| if path.endswith('/'): | |
| use_path = path + 'index.html' | |
| else: | |
| use_path = path | |
| local_path = os.path.join(DOCROOT, use_path.lstrip('/')) | |
| local_path = os.path.realpath(local_path, strict=False) | |
| if os.path.commonpath([DOCROOT, local_path]) != DOCROOT: | |
| error_handler(sock, path, 403, 'Forbidden') | |
| return | |
| if not os.path.exists(local_path): | |
| error_handler(sock, path, 404, 'Not Found') | |
| return | |
| mime_type = EXT_MIME_MAP.get(os.path.splitext(local_path)[1], 'application/octet-stream') | |
| filesize = os.path.getsize(local_path) | |
| print(f'GET [200] {path} - {mime_type} - {filesize} bytes') | |
| sock.sendall(b'HTTP/1.1 200 OK\r\n') | |
| sock.sendall(f'Content-Length: {filesize}\r\n'.encode('ascii')) | |
| sock.sendall(f'Content-Type: {mime_type}\r\n'.encode('ascii')) | |
| sock.sendall(b'\r\n') | |
| with open(local_path, 'rb') as f: | |
| for chunk in iter(lambda: f.read(16 << 10), b''): | |
| sock.sendall(chunk) | |
| def error_handler(sock: socket.socket, path: str, status: int, message: str): | |
| print(f'GET {path} [{status}]') | |
| payload = f'<h1>{message}</h1>'.encode('utf-8') | |
| sock.sendall(f'HTTP/1.1 {status} OK\r\n'.encode('ascii')) | |
| sock.sendall(b'Content-Length: %d\r\n' % (len(payload),)) | |
| sock.sendall(b'Content-Type: text/html\r\n') | |
| sock.sendall(b'\r\n') | |
| sock.sendall(payload) | |
| def urldecode(path): | |
| result_path = '' | |
| tmppath = path | |
| while tmppath: | |
| char = tmppath[0] | |
| if char == '%' and len(tmppath) >= 3: | |
| char = chr(int(tmppath[1:3], 16)) | |
| tmppath = tmppath[3:] | |
| else: | |
| tmppath = tmppath[1:] | |
| result_path += char | |
| return result_path.encode('latin1').decode('utf-8') | |
| def main(): | |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| server.bind(('127.0.0.1', 80)) | |
| server.listen() | |
| stop_event = threading.Event() | |
| threads = [] | |
| while True: | |
| try: | |
| sock, addr = server.accept() | |
| except KeyboardInterrupt: | |
| break | |
| t = threading.Thread(target=client_handler, args=(stop_event, sock, addr)) | |
| t.start() | |
| threads.append(t) | |
| stop_event.set() | |
| for t in threads: | |
| t.join() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment