Created
December 7, 2014 21:22
-
-
Save berdario/39313b94cd08ebe6b3fb to your computer and use it in GitHub Desktop.
A toy HTTP server in pure Python3 I wrote in September 2013
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
import sys | |
import os | |
import locale | |
from collections import defaultdict | |
from socket import socket, AF_INET, SOCK_STREAM | |
from urllib.parse import urlparse | |
from contextlib import closing | |
from functools import wraps | |
import magic | |
args = dict(enumerate(sys.argv)) | |
encoding_guesser = magic.Magic(mime_encoding=True) | |
curdir = os.path.abspath(b'.') | |
ENCODING = locale.getlocale()[1] | |
HTTP_METHODS = frozenset({b'OPTIONS', b'GET', b'HEAD', b'POST', b'PUT', b'DELETE', b'TRACE', b'CONNECT'}) | |
HTTP_VERSION = b'HTTP/1.1' | |
def parse_path(f): | |
@wraps(f) | |
def handler(resource, headers): | |
if not resource.startswith(b'/'): | |
resource = urlparse(resource).path or b'/' | |
resource = os.path.abspath(resource[1:] or b'.') | |
if not resource.startswith(curdir): | |
resource = curdir | |
return f(resource, headers) | |
return handler | |
def guess_type(path): | |
typ = magic.from_file(path, mime=True) | |
if typ.startswith(b'text'): | |
return typ + b"; charset=" + encoding_guesser.from_file(path) | |
else: | |
return typ | |
def file_browser(path): | |
headers = {b'Connection': b'close'} | |
if not os.path.exists(path): | |
return (None, headers) | |
elif os.path.isdir(path): | |
content_reader = lambda: b"\n".join(p for p in os.listdir(path) if not p.startswith(b'.')) | |
headers.update({b'Content-Type': b'text/plain; charset=UTF-8'}) | |
return (content_reader, headers) | |
else: | |
headers.update({b'Content-Type': guess_type(path.decode())}) | |
def content_reader(): | |
with open(path, 'rb') as f: | |
return f.read() | |
return (content_reader, headers) | |
def build_response(resource, headers, handler): | |
content, response_headers = file_browser(resource) | |
if content is None: | |
return HTTP_VERSION + b" 404 File Not Found\r\n\r\n" | |
else: | |
status_line = b" ".join([HTTP_VERSION, b'202', b'Accepted']) | |
headers_bytes = b"\r\n".join(map(b':'.join, response_headers.items())) + b'\r\n' | |
return handler(status_line, headers_bytes, content) | |
@parse_path | |
def handle_GET(resource, headers): | |
def handler(status_line, headers_bytes, content_reader): | |
return b"\r\n".join([status_line, headers_bytes, content_reader()]) | |
return build_response(resource, headers, handler) | |
@parse_path | |
def handle_HEAD(resource, headers): | |
def handler(status_line, headers_bytes, content): | |
return b"\r\n".join([status_line, headers_bytes, b'']) | |
return build_response(resource, headers, handler) | |
def missing_method(resource, headers): | |
return HTTP_VERSION + b" 405 Method Not Allowed\r\n\r\n" | |
methodmap = defaultdict(lambda : missing_method) | |
methodmap.update({b'GET': handle_GET, b'HEAD': handle_HEAD}) | |
def handle_client(sock): | |
data = sock.recv(1000) | |
while not data.endswith(b"\r\n\r\n"): | |
newdata = sock.recv(1000) | |
if not newdata: | |
return | |
data += newdata | |
headers = data.strip().splitlines() | |
request, headers = headers[0], headers[1:] | |
headers = dict(map(lambda s: s.split(b':', 1), headers)) | |
try: | |
method, resource, http_version = request.split() | |
except ValueError: | |
response = HTTP_VERSION + b" 400 Bad Request\r\n\r\n" | |
else: | |
if method not in HTTP_METHODS: | |
response = HTTP_VERSION + b" 501 Not Implemented\r\n\r\n" | |
else: | |
response = methodmap[method](resource, headers) | |
sock.sendall(response) | |
if __name__ == '__main__': | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.bind(('', int(args.get(1, 8080)))) | |
sock.listen(1000) | |
while True: | |
with closing(sock.accept()[0]) as newsock: | |
handle_client(newsock) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment