Skip to content

Instantly share code, notes, and snippets.

@vbe0201
Last active November 5, 2025 14:45
Show Gist options
  • Save vbe0201/cce570c733c70e06ff62dc60597c5e6d to your computer and use it in GitHub Desktop.
Save vbe0201/cce570c733c70e06ff62dc60597c5e6d to your computer and use it in GitHub Desktop.
Minio: Tiny Python async runtime for teaching purposes
import inspect
import time
import types
from collections import deque
from heapq import heappush, heappop
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
class Task:
def __init__(self, coro):
self.coro = coro
self.call_result = None
self.result = None
self.waiters = set()
def wake(self):
# Resume our coro until next suspension.
return self.coro.send(self.call_result)
@types.coroutine
def _runtime_call(event, value):
call_result = yield event, value
return call_result
class JoinHandle:
def __init__(self, task):
self.task = task
def __await__(self):
# "I want to wait until self.task finishes"
yield from _runtime_call("join", self.task)
return self.task.result
class TimerHeap:
def __init__(self):
self.timers = []
def __len__(self):
return len(self.timers)
def add(self, task, secs):
heappush(self.timers, (time.monotonic() + secs, task))
def next_deadline(self):
return self.timers[0][0] - time.monotonic()
def get_elapsed(self):
now = time.monotonic()
while self and self.timers[0][0] < now:
_, task = heappop(self.timers)
yield task
class Runtime:
def __init__(self):
self.handlers = {
"spawn": self.handle_spawn,
"join": self.handle_join,
"sleep": self.handle_sleep,
"io": self.handle_io,
}
self.run_queue = deque()
self.timers = TimerHeap()
self.selector = DefaultSelector()
def handle_spawn(self, spawning_task, spawn_coro):
spawn_task = Task(spawn_coro)
# Append the task for the newly spawned coroutine
# to the run queue so the event loop sees it.
self.run_queue.append(spawn_task)
# And since the spawning task is immediately ready
# to make progress again, we put it to the front
# of the run queue so it gets resumed immediately.
spawning_task.call_result = JoinHandle(spawn_task)
self.run_queue.appendleft(spawning_task)
def handle_join(self, waiting_task, join_task):
# Register interest in being notified when join_task
# completes. Until then, waiting_task will not be put
# in the run queue again.
join_task.waiters.add(waiting_task)
waiting_task.call_result = None
def handle_sleep(self, task, delay):
# Block the task for the given delay before it will
# be added to the run queue again.
self.timers.add(task, delay)
task.call_result = None
def handle_io(self, task, data):
# Register interest in the fileobj becoming readable
# or writable. We attach the waiting Task to the map
# so that we can associate every notification to the
# Task it is meant for in the event loop.
event, fileobj = data
self.selector.register(fileobj, event, task)
task.call_result = None
def run(self, coro):
# Make a Task for the initial coroutine.
root_task = Task(coro)
# Run the event loop while there's still work left.
self.run_queue.append(root_task)
while self.run_queue or self.timers or self.selector.get_map():
# When no Tasks are left, pause the thread while waiting
# for I/O or timers. We use the timer closest to expiring
# as the deadline for I/O waits.
if not self.run_queue:
to = self.timers.next_deadline() if self.timers else None
for key, _ in self.selector.select(to):
self.run_queue.append(key.data)
self.selector.unregister(key.fileobj)
# Put elapsed timers in the run queue.
for task in self.timers.get_elapsed():
self.run_queue.append(task)
task = self.run_queue.popleft()
try:
event, arg = task.wake()
except StopIteration as e:
task.result = e.value
self.run_queue.extend(task.waiters)
task.waiters.clear()
except Exception as e:
if task is root_task:
raise
else:
print(e)
else:
handler = self.handlers[event]
handler(task, arg)
# Return the result of coro when we're done.
return root_task.result
async def spawn(coro):
if not inspect.iscoroutine(coro):
raise TypeError("coro must be a coroutine")
return await _runtime_call("spawn", coro)
async def sleep(delay):
return await _runtime_call("sleep", delay)
async def wait_readable(fileobj):
return await _runtime_call("io", (EVENT_READ, fileobj))
async def wait_writable(fileobj):
return await _runtime_call("io", (EVENT_WRITE, fileobj))
def run(coro):
rt = Runtime()
return rt.run(coro)
import minio
import socket
async def serve(conn):
conn.setblocking(False)
await minio.wait_readable(conn)
data = conn.recv(100)
while len(data) > 0:
await minio.wait_writable(conn)
sent = conn.send(data)
data = data[sent:]
conn.close()
async def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 8888))
server.listen()
server.setblocking(False)
with server:
while True:
await minio.wait_readable(server)
conn, _ = server.accept()
await minio.spawn(serve(conn))
minio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment