Skip to content

Instantly share code, notes, and snippets.

@nitori
Created September 3, 2022 11:52
Show Gist options
  • Select an option

  • Save nitori/e855e41893778906289825cabcc33dde to your computer and use it in GitHub Desktop.

Select an option

Save nitori/e855e41893778906289825cabcc33dde to your computer and use it in GitHub Desktop.
Quick and dirty ~1-hour HTTP server written in python, only using socket, os and threading
from __future__ import annotations
import threading
import socket
import os
DOCROOT = os.path.dirname(os.path.abspath(__file__))
EXT_MIME_MAP = {
'.html': 'text/html; charset=utf-8',
'.css': 'text/css; charset=utf-8',
'.txt': 'text/plain; charset=utf-8',
'.csv': 'text/csv; charset=utf-8',
'.py': 'text/plain; charset=utf-8',
'.js': 'application/javascript; charset=utf-8',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.json': 'application/json; charset=utf-8',
}
def parse_header(header: bytes):
header = header.decode('latin1')
lines = header.split('\r\n')
headers = {}
method = path = None
for i, line in enumerate(lines):
if i == 0:
method, path, _ = line.split(' ')
else:
key, value = line.split(':', 1)
headers[key.lower().strip()] = urldecode(value.strip())
return method, path, headers
def client_handler(stop_event: threading.Event, sock: socket.socket, addr):
buf = b''
method = path = headers = None
while not stop_event.is_set():
data = sock.recv(1024)
if not data:
break
buf += data
if headers is None:
if b'\r\n\r\n' in buf:
header, buf = buf.split(b'\r\n\r\n', 1)
method, path, headers = parse_header(header)
if 'content-length' not in headers:
handle_request(sock, method, path, headers)
headers = None
else:
content_length = int(headers['content-length'])
if len(buf) >= content_length:
body, buf = buf[:content_length], buf[content_length:]
handle_request(sock, method, path, headers, body)
headers = None
def handle_request(sock: socket.socket, method, path, headers, body=None):
path = urldecode(path)
if path.endswith('/'):
use_path = path + 'index.html'
else:
use_path = path
local_path = os.path.join(DOCROOT, use_path.lstrip('/'))
local_path = os.path.realpath(local_path, strict=False)
if os.path.commonpath([DOCROOT, local_path]) != DOCROOT:
error_handler(sock, path, 403, 'Forbidden')
return
if not os.path.exists(local_path):
error_handler(sock, path, 404, 'Not Found')
return
mime_type = EXT_MIME_MAP.get(os.path.splitext(local_path)[1], 'application/octet-stream')
filesize = os.path.getsize(local_path)
print(f'GET [200] {path} - {mime_type} - {filesize} bytes')
sock.sendall(b'HTTP/1.1 200 OK\r\n')
sock.sendall(f'Content-Length: {filesize}\r\n'.encode('ascii'))
sock.sendall(f'Content-Type: {mime_type}\r\n'.encode('ascii'))
sock.sendall(b'\r\n')
with open(local_path, 'rb') as f:
for chunk in iter(lambda: f.read(16 << 10), b''):
sock.sendall(chunk)
def error_handler(sock: socket.socket, path: str, status: int, message: str):
print(f'GET {path} [{status}]')
payload = f'<h1>{message}</h1>'.encode('utf-8')
sock.sendall(f'HTTP/1.1 {status} OK\r\n'.encode('ascii'))
sock.sendall(b'Content-Length: %d\r\n' % (len(payload),))
sock.sendall(b'Content-Type: text/html\r\n')
sock.sendall(b'\r\n')
sock.sendall(payload)
def urldecode(path):
result_path = ''
tmppath = path
while tmppath:
char = tmppath[0]
if char == '%' and len(tmppath) >= 3:
char = chr(int(tmppath[1:3], 16))
tmppath = tmppath[3:]
else:
tmppath = tmppath[1:]
result_path += char
return result_path.encode('latin1').decode('utf-8')
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 80))
server.listen()
stop_event = threading.Event()
threads = []
while True:
try:
sock, addr = server.accept()
except KeyboardInterrupt:
break
t = threading.Thread(target=client_handler, args=(stop_event, sock, addr))
t.start()
threads.append(t)
stop_event.set()
for t in threads:
t.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment