Skip to content

Instantly share code, notes, and snippets.

@jordanorelli
Created August 9, 2025 21:57
Show Gist options
  • Select an option

  • Save jordanorelli/636847858e2955bd848c6654f8beb6c6 to your computer and use it in GitHub Desktop.

Select an option

Save jordanorelli/636847858e2955bd848c6654f8beb6c6 to your computer and use it in GitHub Desktop.
async python without asyncio lol dave beazley good job
import time
import heapq
from collections import deque
def switch():
return Awaitable()
class Awaitable:
def __await__(self):
yield
class Scheduler:
def __init__(self):
self.ready = deque() # These are generators
self.sleeping = [] # Sleeping tasks
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
# The current coroutine wants to sleep. How?
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None
await switch()
def new_task(self, gen):
self.ready.append(gen)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
try:
self.current.send(None)
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
async def countdown(n: int):
while n > 0:
print("Down", n)
await scheduler.sleep(4)
n -= 1
async def countup(stop: int):
x = 0
while x < stop:
print("Up", x)
await scheduler.sleep(1)
x += 1
scheduler = Scheduler()
scheduler.new_task(countdown(5))
scheduler.new_task(countup(20))
scheduler.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment