Created
August 15, 2023 19:24
-
-
Save lloesche/6c03ae3ffb5115852fa53dca7c803f05 to your computer and use it in GitHub Desktop.
Python async server that uses multiple CPU cores while sharing a single socket.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import signal | |
import socket | |
import multiprocessing | |
import asyncio | |
import logging | |
import argparse | |
from threading import Event | |
from asyncio import StreamReader, StreamWriter | |
log_format = f"%(asctime)s|%(levelname)5s|%(process)d|%(threadName)10s %(message)s" | |
logging.basicConfig(level=logging.WARNING, format=log_format) | |
log = logging.getLogger("multicoreserver") | |
log.setLevel(logging.DEBUG) | |
HTTP_RESPONSE = b"""HTTP/1.1 200 OK | |
Content-Type: text/plain | |
Connection: close | |
Hello, World! | |
""" | |
async def handle_client(reader: StreamReader, writer: StreamWriter) -> None: | |
request: bytes = await reader.read(4096) | |
if request.startswith(b"GET"): | |
writer.write(HTTP_RESPONSE) | |
await writer.drain() | |
writer.close() | |
await writer.wait_closed() | |
async def worker_task(sock: socket.socket, command_queue: multiprocessing.Queue) -> None: | |
try: | |
server: asyncio.AbstractServer = await asyncio.start_server(handle_client, sock=sock) | |
except Exception as e: | |
log.error(f"Error while starting server: {e}") | |
return | |
try: | |
while True: | |
if not command_queue.empty(): | |
try: | |
command = command_queue.get() | |
except EOFError: | |
break | |
except Exception as e: | |
log.error(f"Error while getting command from queue: {e}") | |
continue | |
if command == "shutdown": | |
break | |
await asyncio.sleep(1) | |
finally: | |
server.close() | |
await server.wait_closed() | |
def worker(sock: socket.socket, command_queue: multiprocessing.Queue) -> None: | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete(worker_task(sock, command_queue)) | |
finally: | |
loop.close() | |
def main() -> None: | |
parser = argparse.ArgumentParser(description="Multi-core HTTP Server") | |
parser.add_argument("-p", "--port", type=int, default=8080, help="Port to listen on. (default: 8080)") | |
parser.add_argument("-n", "--num-workers", type=int, default=16, help="Number of worker processes. (default: 16)") | |
args = parser.parse_args() | |
port = args.port | |
num_workers = args.num_workers | |
command_queue = multiprocessing.Queue() | |
shutdown_event = Event() | |
log.info(f"Starting server on port {port}...") | |
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) | |
sock.bind(("::", port)) | |
sock.listen(5) | |
processes = [] | |
def start_new_process(): | |
try: | |
p: multiprocessing.Process = multiprocessing.Process(target=worker, args=(sock, command_queue)) | |
p.start() | |
except Exception as e: | |
log.error(f"Error while starting worker process: {e}") | |
else: | |
log.debug(f"Started worker process with PID {p.pid}") | |
processes.append(p) | |
for _ in range(num_workers): | |
start_new_process() | |
def signal_handler(signum, frame): | |
shutdown_event.set() | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
while not shutdown_event.is_set(): | |
for p in list(processes): | |
if not p.is_alive(): | |
log.warning(f"Worker process with PID {p.pid} died. Restarting it...") | |
processes.remove(p) | |
start_new_process() | |
shutdown_event.wait(timeout=1) | |
log.info("Shutting down...") | |
try: | |
for _ in processes: | |
command_queue.put("shutdown") | |
except Exception as e: | |
log.error(f"Error while sending shutdown command to worker processes: {e}") | |
for p in processes: | |
p.join(timeout=5) | |
if p.is_alive(): | |
log.warning(f"Worker process with PID {p.pid} did not shut down in time. Forcefully terminating it...") | |
p.terminate() | |
log.info("Shutdown complete") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just answered my own question via https://learn.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
Looks like
SO_REUSEADDR
has been around since Windows 95.