Skip to content

Instantly share code, notes, and snippets.

@markshannon
Last active July 19, 2025 08:52
Show Gist options
  • Save markshannon/110b5cc6b071ca98026a34756ca50ffc to your computer and use it in GitHub Desktop.
Save markshannon/110b5cc6b071ca98026a34756ca50ffc to your computer and use it in GitHub Desktop.
# Minimal virtual threads implementation
# Requires continuations: https://github.com/markshannon/cpython/tree/continuations
# The only blocking call implemented is `sleep`
from continuations import Continuation
from collections import deque
import heapq, time
from functools import total_ordering
scheduler = None
class Scheduler:
def __init__(self):
self.continuation = Continuation(Scheduler.run_threads)
self.continuation.start(self)
self.runnables = deque()
self.sleepers = []
def run_threads(self):
global current_vthread
while True:
while self.runnables:
vt = self.runnables.popleft()
current_vthread = vt
if vt.continuation.started:
vt.continuation.send(None)
else:
vt.continuation.start()
if self.sleepers:
t, sleeper = heapq.heappop(self.sleepers)
delta = t - time.monotonic()
if delta > 0:
throw = None
try:
time.sleep(delta)
except KeyboardInterrupt:
throw = KeyboardInterrupt
if throw is None:
self.runnables.append(sleeper)
else:
sleeper.continuation.throw(KeyboardInterrupt)
else:
self.runnables.append(sleeper)
def sleep(self, vt, t):
if t <= 0:
return
now = time.monotonic()
then = now + t
heapq.heappush(self.sleepers, (then, vt))
self.continuation.send(None)
scheduler = None
current_vthread = None
def sleep(t):
global scheduler
if current_vthread is None:
time.sleep(t)
else:
if scheduler is None:
scheduler = Scheduler()
scheduler.sleep(current_vthread, t)
next_id = 1
@total_ordering
class VirtualThread:
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None):
global next_id
self.id = next_id
next_id += 1
if kwargs is None:
kwargs = {}
if group is not None:
raise ValueError("group must be None")
self.target = target
self.args = args
self.kwargs = kwargs
self.continuation = Continuation(VirtualThread.run)
self.continuation.start(self)
self.started = False
self.name = name
def run(self):
self.target(*self.args, **self.kwargs)
def start(self):
global scheduler
if self.started:
raise RuntimeError("Virtual thread has already been started")
self.started = True
if scheduler is None:
scheduler = Scheduler()
scheduler.runnables.append(self)
def join(self, timeout=None):
if not self.started:
raise RuntimeError("Virtual thread has not been started")
assert scheduler is not None
global current_vthread
if timeout is not None:
raise NotImplementedError("Timeouts not yet implemented")
scheduler.continuation.run(None)
def __lt__(self, other):
return self.id < other.id
def __eq__(self, other):
return self.id == other.id
class VirtualThreadGroup:
def __init__(self):
self.threads = []
def __enter__(self):
return self
def __exit__(self, *args):
for t in self.threads:
t.join()
self.threads = []
def create_thread(self, group=None, target=None, name=None, args=(), kwargs=None):
thread = VirtualThread(group, target, name, args, kwargs)
self.threads.append(thread)
return thread
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment