Last active
November 5, 2025 14:45
-
-
Save vbe0201/cce570c733c70e06ff62dc60597c5e6d to your computer and use it in GitHub Desktop.
Minio: Tiny Python async runtime for teaching purposes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import inspect | |
| import time | |
| import types | |
| from collections import deque | |
| from heapq import heappush, heappop | |
| from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE | |
| class Task: | |
| def __init__(self, coro): | |
| self.coro = coro | |
| self.call_result = None | |
| self.result = None | |
| self.waiters = set() | |
| def wake(self): | |
| # Resume our coro until next suspension. | |
| return self.coro.send(self.call_result) | |
| @types.coroutine | |
| def _runtime_call(event, value): | |
| call_result = yield event, value | |
| return call_result | |
| class JoinHandle: | |
| def __init__(self, task): | |
| self.task = task | |
| def __await__(self): | |
| # "I want to wait until self.task finishes" | |
| yield from _runtime_call("join", self.task) | |
| return self.task.result | |
| class TimerHeap: | |
| def __init__(self): | |
| self.timers = [] | |
| def __len__(self): | |
| return len(self.timers) | |
| def add(self, task, secs): | |
| heappush(self.timers, (time.monotonic() + secs, task)) | |
| def next_deadline(self): | |
| return self.timers[0][0] - time.monotonic() | |
| def get_elapsed(self): | |
| now = time.monotonic() | |
| while self and self.timers[0][0] < now: | |
| _, task = heappop(self.timers) | |
| yield task | |
| class Runtime: | |
| def __init__(self): | |
| self.handlers = { | |
| "spawn": self.handle_spawn, | |
| "join": self.handle_join, | |
| "sleep": self.handle_sleep, | |
| "io": self.handle_io, | |
| } | |
| self.run_queue = deque() | |
| self.timers = TimerHeap() | |
| self.selector = DefaultSelector() | |
| def handle_spawn(self, spawning_task, spawn_coro): | |
| spawn_task = Task(spawn_coro) | |
| # Append the task for the newly spawned coroutine | |
| # to the run queue so the event loop sees it. | |
| self.run_queue.append(spawn_task) | |
| # And since the spawning task is immediately ready | |
| # to make progress again, we put it to the front | |
| # of the run queue so it gets resumed immediately. | |
| spawning_task.call_result = JoinHandle(spawn_task) | |
| self.run_queue.appendleft(spawning_task) | |
| def handle_join(self, waiting_task, join_task): | |
| # Register interest in being notified when join_task | |
| # completes. Until then, waiting_task will not be put | |
| # in the run queue again. | |
| join_task.waiters.add(waiting_task) | |
| waiting_task.call_result = None | |
| def handle_sleep(self, task, delay): | |
| # Block the task for the given delay before it will | |
| # be added to the run queue again. | |
| self.timers.add(task, delay) | |
| task.call_result = None | |
| def handle_io(self, task, data): | |
| # Register interest in the fileobj becoming readable | |
| # or writable. We attach the waiting Task to the map | |
| # so that we can associate every notification to the | |
| # Task it is meant for in the event loop. | |
| event, fileobj = data | |
| self.selector.register(fileobj, event, task) | |
| task.call_result = None | |
| def run(self, coro): | |
| # Make a Task for the initial coroutine. | |
| root_task = Task(coro) | |
| # Run the event loop while there's still work left. | |
| self.run_queue.append(root_task) | |
| while self.run_queue or self.timers or self.selector.get_map(): | |
| # When no Tasks are left, pause the thread while waiting | |
| # for I/O or timers. We use the timer closest to expiring | |
| # as the deadline for I/O waits. | |
| if not self.run_queue: | |
| to = self.timers.next_deadline() if self.timers else None | |
| for key, _ in self.selector.select(to): | |
| self.run_queue.append(key.data) | |
| self.selector.unregister(key.fileobj) | |
| # Put elapsed timers in the run queue. | |
| for task in self.timers.get_elapsed(): | |
| self.run_queue.append(task) | |
| task = self.run_queue.popleft() | |
| try: | |
| event, arg = task.wake() | |
| except StopIteration as e: | |
| task.result = e.value | |
| self.run_queue.extend(task.waiters) | |
| task.waiters.clear() | |
| except Exception as e: | |
| if task is root_task: | |
| raise | |
| else: | |
| print(e) | |
| else: | |
| handler = self.handlers[event] | |
| handler(task, arg) | |
| # Return the result of coro when we're done. | |
| return root_task.result | |
| async def spawn(coro): | |
| if not inspect.iscoroutine(coro): | |
| raise TypeError("coro must be a coroutine") | |
| return await _runtime_call("spawn", coro) | |
| async def sleep(delay): | |
| return await _runtime_call("sleep", delay) | |
| async def wait_readable(fileobj): | |
| return await _runtime_call("io", (EVENT_READ, fileobj)) | |
| async def wait_writable(fileobj): | |
| return await _runtime_call("io", (EVENT_WRITE, fileobj)) | |
| def run(coro): | |
| rt = Runtime() | |
| return rt.run(coro) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import minio | |
| import socket | |
| async def serve(conn): | |
| conn.setblocking(False) | |
| await minio.wait_readable(conn) | |
| data = conn.recv(100) | |
| while len(data) > 0: | |
| await minio.wait_writable(conn) | |
| sent = conn.send(data) | |
| data = data[sent:] | |
| conn.close() | |
| async def main(): | |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| server.bind(("127.0.0.1", 8888)) | |
| server.listen() | |
| server.setblocking(False) | |
| with server: | |
| while True: | |
| await minio.wait_readable(server) | |
| conn, _ = server.accept() | |
| await minio.spawn(serve(conn)) | |
| minio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment