Skip to content

Instantly share code, notes, and snippets.

@theelous3
Created May 22, 2018 20:28
Show Gist options
  • Save theelous3/c660c023532b0eb0674166ef7af98145 to your computer and use it in GitHub Desktop.
Save theelous3/c660c023532b0eb0674166ef7af98145 to your computer and use it in GitHub Desktop.
"""
Server that accepts a connection, spawns a client handler. Client handler
accepts a request, sends a 404, and kills the connection. Client should attempt
to reuse the connection, failing. Client should retry a new request, passing.
"""
import time
import socket
import h11
from threading import Thread
def server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 25001))
s.listen(5)
while True:
print('listening')
sock, _ = s.accept()
Thread(target=handler, args=[sock]).start()
def handler(sock):
hcon = h11.Connection(our_role=h11.SERVER)
print('got client')
http_next_event(sock, hcon)
print('got one event')
http_send(
hcon,
sock,
h11.Response(status_code=404,
http_version=b'1.1',
reason=b'NOT FOUND',
headers=[('connection', 'close')])
)
http_send(
hcon,
sock,
h11.Data(data=b'404 Nothin')
)
http_send(
hcon,
sock,
h11.EndOfMessage()
)
time.sleep(1)
sock.close()
print('socket closed, sleeping')
time.sleep(999)
def http_next_event(sock, hcon):
while True:
event = hcon.next_event()
if event is h11.NEED_DATA:
hcon.receive_data(sock.recv(2048))
continue
return event
def http_send(hcon, sock, *events):
for event in events:
data = hcon.send(event)
if data is not None:
sock.sendall(data)
server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment