Skip to content

Instantly share code, notes, and snippets.

@franzwong
Created November 24, 2024 04:13
Show Gist options
  • Save franzwong/17a53587bc342ac0b009daa42d2f97db to your computer and use it in GitHub Desktop.
Save franzwong/17a53587bc342ac0b009daa42d2f97db to your computer and use it in GitHub Desktop.
Sample http2 server with hyper
import collections
import io
import json
import socket
import ssl
import time
import h2
import h2.connection
import h2.config
from h2.events import RequestReceived, DataReceived, StreamEnded, ConnectionTerminated, StreamReset, WindowUpdated, \
RemoteSettingsChanged
from h2.exceptions import StreamClosedError, ProtocolError
from h2.settings import SettingCodes
READ_CHUNK_SIZE = 8192
RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
def main():
host = '127.0.0.1'
port = 8443
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Listening on {host}:{port}...")
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile="server.crt", keyfile="server.key")
context.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
context.set_alpn_protocols(["h2", "http/1.1"])
secure_socket = context.wrap_socket(server_socket, server_side=True)
stream_data = {}
try:
while True:
client_socket, addr = secure_socket.accept()
print(f"Client connection accepted")
config = h2.config.H2Configuration(client_side=False, header_encoding='utf-8')
h2_conn = h2.connection.H2Connection(config=config)
h2_conn.initiate_connection()
client_socket.sendall(h2_conn.data_to_send())
client_socket_closed = False
while not client_socket_closed:
data = client_socket.recv(65535)
if not data:
break
events = h2_conn.receive_data(data)
for event in events:
if isinstance(event, RequestReceived):
headers = dict(event.headers)
print(f"Method: {headers[":method"]}")
request_data = RequestData(headers, io.BytesIO())
stream_data[event.stream_id] = request_data
elif isinstance(event, DataReceived):
print(f"Data received")
stream_data[event.stream_id].data.write(event.data)
elif isinstance(event, StreamEnded):
request_data = stream_data[event.stream_id]
headers = request_data.headers
body = request_data.data.getvalue().decode('utf-8')
data = json.dumps(
{"headers": headers, "body": body}, indent=4
).encode("utf8")
response_headers = (
(':status', '200'),
('content-type', 'application/json'),
('content-length', str(len(data))),
('server', 'asyncio-h2'),
)
h2_conn.send_headers(event.stream_id, response_headers)
client_socket.sendall(h2_conn.data_to_send())
while data:
while h2_conn.local_flow_control_window(event.stream_id) < 1:
time.sleep(1)
chunk_size = min(
h2_conn.local_flow_control_window(event.stream_id),
h2_conn.max_outbound_frame_size,
len(data)
)
try:
h2_conn.send_data(
event.stream_id,
data[:chunk_size],
end_stream=(chunk_size == len(data))
)
except (StreamClosedError, ProtocolError):
break
client_socket.write(h2_conn.data_to_send())
data = data[chunk_size:]
elif isinstance(event, ConnectionTerminated):
print("close")
client_socket.close()
client_socket_closed = True
elif isinstance(event, StreamReset):
print("reset")
elif isinstance(event, WindowUpdated):
print(f"window updated (delta: {event.delta})")
elif isinstance(event, RemoteSettingsChanged):
if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
print(f"initial window size: {event.changed_settings[SettingCodes.INITIAL_WINDOW_SIZE]}")
if not client_socket_closed:
client_socket.write(h2_conn.data_to_send())
except KeyboardInterrupt:
print("\nShutting down server...")
finally:
secure_socket.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment