Last active
July 19, 2025 08:52
-
-
Save markshannon/110b5cc6b071ca98026a34756ca50ffc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Minimal virtual threads implementation | |
| # Requires continuations: https://github.com/markshannon/cpython/tree/continuations | |
| # The only blocking call implemented is `sleep` | |
| from continuations import Continuation | |
| from collections import deque | |
| import heapq, time | |
| from functools import total_ordering | |
| scheduler = None | |
| class Scheduler: | |
| def __init__(self): | |
| self.continuation = Continuation(Scheduler.run_threads) | |
| self.continuation.start(self) | |
| self.runnables = deque() | |
| self.sleepers = [] | |
| def run_threads(self): | |
| global current_vthread | |
| while True: | |
| while self.runnables: | |
| vt = self.runnables.popleft() | |
| current_vthread = vt | |
| if vt.continuation.started: | |
| vt.continuation.send(None) | |
| else: | |
| vt.continuation.start() | |
| if self.sleepers: | |
| t, sleeper = heapq.heappop(self.sleepers) | |
| delta = t - time.monotonic() | |
| if delta > 0: | |
| throw = None | |
| try: | |
| time.sleep(delta) | |
| except KeyboardInterrupt: | |
| throw = KeyboardInterrupt | |
| if throw is None: | |
| self.runnables.append(sleeper) | |
| else: | |
| sleeper.continuation.throw(KeyboardInterrupt) | |
| else: | |
| self.runnables.append(sleeper) | |
| def sleep(self, vt, t): | |
| if t <= 0: | |
| return | |
| now = time.monotonic() | |
| then = now + t | |
| heapq.heappush(self.sleepers, (then, vt)) | |
| self.continuation.send(None) | |
| scheduler = None | |
| current_vthread = None | |
| def sleep(t): | |
| global scheduler | |
| if current_vthread is None: | |
| time.sleep(t) | |
| else: | |
| if scheduler is None: | |
| scheduler = Scheduler() | |
| scheduler.sleep(current_vthread, t) | |
| next_id = 1 | |
| @total_ordering | |
| class VirtualThread: | |
| def __init__(self, group=None, target=None, name=None, args=(), kwargs=None): | |
| global next_id | |
| self.id = next_id | |
| next_id += 1 | |
| if kwargs is None: | |
| kwargs = {} | |
| if group is not None: | |
| raise ValueError("group must be None") | |
| self.target = target | |
| self.args = args | |
| self.kwargs = kwargs | |
| self.continuation = Continuation(VirtualThread.run) | |
| self.continuation.start(self) | |
| self.started = False | |
| self.name = name | |
| def run(self): | |
| self.target(*self.args, **self.kwargs) | |
| def start(self): | |
| global scheduler | |
| if self.started: | |
| raise RuntimeError("Virtual thread has already been started") | |
| self.started = True | |
| if scheduler is None: | |
| scheduler = Scheduler() | |
| scheduler.runnables.append(self) | |
| def join(self, timeout=None): | |
| if not self.started: | |
| raise RuntimeError("Virtual thread has not been started") | |
| assert scheduler is not None | |
| global current_vthread | |
| if timeout is not None: | |
| raise NotImplementedError("Timeouts not yet implemented") | |
| scheduler.continuation.run(None) | |
| def __lt__(self, other): | |
| return self.id < other.id | |
| def __eq__(self, other): | |
| return self.id == other.id | |
| class VirtualThreadGroup: | |
| def __init__(self): | |
| self.threads = [] | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| for t in self.threads: | |
| t.join() | |
| self.threads = [] | |
| def create_thread(self, group=None, target=None, name=None, args=(), kwargs=None): | |
| thread = VirtualThread(group, target, name, args, kwargs) | |
| self.threads.append(thread) | |
| return thread |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment