Skip to content

Instantly share code, notes, and snippets.

@nitori
Created June 17, 2025 09:08
Show Gist options
  • Save nitori/ab88a0dd1453989817ec136daa99e678 to your computer and use it in GitHub Desktop.
Save nitori/ab88a0dd1453989817ec136daa99e678 to your computer and use it in GitHub Desktop.
very simple select based multi-client server. no threads
import socket
from typing import Callable, Any, NamedTuple
import select
import heapq
import time
class Schedule(NamedTuple):
when: float
callback: Callable[[], Any]
class Client:
sock: socket.socket
addr: tuple[str, int]
loop: 'Loop'
def __init__(self, sock, addr, loop: 'Loop'):
self.sock = sock
self.addr = addr
self.loop = loop
self._buf = b''
def handle_read(self) -> bool:
try:
chunk = self.sock.recv(1 << 12)
except socket.error as exc:
if exc.errno in (socket.EAGAIN, socket.EWOULDBLOCK):
return False
raise
if not chunk:
return True
self._buf += chunk
*lines, self._buf = self._buf.split(b'\n')
for line in lines:
line: bytes
sline = line.decode('utf-8')
print(f'< {sline}')
match sline.split():
case ['sched', timeout]:
print(f'scheduling callback to run in {timeout} seconds.')
self.loop.call_at(self.callback_test, float(timeout))
return False
def callback_test(self):
print('*** callback called ****')
class Loop:
def __init__(self):
self.read_fds = []
self.write_fds = []
self.error_fds = []
self.scheduled: list[Schedule] = []
self.clients: dict[socket.socket, Client] = {}
heapq.heapify(self.scheduled)
def call_at(self, callback: Callable[[], Any], seconds: float | int):
when = time.time() + seconds
heapq.heappush(self.scheduled, Schedule(when, callback))
def run(self):
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.bind(('127.0.0.1', 12345))
ssock.listen(5)
ssock.setblocking(False)
self.read_fds.append(ssock)
self.error_fds.append(ssock)
while True:
timeout = None
if self.scheduled:
timeout = max(0.0, self.scheduled[0].when - time.time())
rlist, wlist, elist = select.select(self.read_fds, self.write_fds, self.error_fds, timeout)
print('... processing')
while self.scheduled and self.scheduled[0].when <= time.time():
_, callback = heapq.heappop(self.scheduled)
callback()
for sock in rlist:
if sock is ssock:
csock, addr = ssock.accept()
print(f'Connection from {addr}')
csock.setblocking(False)
self.clients[csock] = Client(csock, addr, self)
self.read_fds.append(csock)
self.error_fds.append(csock)
continue
# client code
client = self.clients[sock]
disconnected = client.handle_read()
if disconnected:
print(f'Disconnected {client.addr}')
self.read_fds.remove(sock)
self.error_fds.remove(sock)
del self.clients[sock]
def main():
loop = Loop()
loop.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment