Skip to content

Instantly share code, notes, and snippets.

@guojianwei001
Forked from dabeaz/aproducer.py
Created June 1, 2022 12:49
Show Gist options
  • Save guojianwei001/969367d77173b74aefec89be055a4150 to your computer and use it in GitHub Desktop.
Save guojianwei001/969367d77173b74aefec89be055a4150 to your computer and use it in GitHub Desktop.
"Build Your Own Async" Workshop - PyCon India - October 14, 2019 - https://www.youtube.com/watch?v=Y4Gt3Xjd7G8
# aproducer.py
#
# Async Producer-consumer problem.
# Challenge: How to implement the same functionality, but no threads.
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler() # Behind scenes scheduler object
# -----
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque() # All getters waiting for data
def put(self, item):
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
# Do we call it right away? No. Schedule it to be called.
sched.call_soon(func)
def get(self, callback):
# Wait until an item is available. Then return it
if self.items:
callback(self.items.popleft())
else:
self.waiting.append(lambda: self.get(callback))
def producer(q, count):
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n+1))
else:
print('Producer done')
q.put(None)
_run(0)
def consumer(q):
def _consume(item):
if item is None:
print('Consumer done')
else:
print('Consuming', item)
sched.call_soon(lambda: consumer(q))
q.get(callback=_consume)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q,))
sched.run()
# aproducer_error.py
#
# Example of returning results + errors in callback based code.
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler() # Behind scenes scheduler object
# -----
# Class used to communicate both a normal value or an exception
class Result:
def __init__(self, value=None, exc=None):
self.value = value
self.exc = exc
def result(self):
if self.exc:
raise self.exc
else:
return self.value
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque() # All getters waiting for data
self._closed = False # Can queue be used anymore?
def close(self):
self._closed = True
if self.waiting and not self.items:
for func in self.waiting:
sched.call_soon(func)
def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
# Do we call it right away?
sched.call_soon(func)
def get(self, callback):
# Wait until an item is available. Then return it
# Question: How does a closed queue interact with get()
if self.items:
callback(Result(value=self.items.popleft())) # Good result
else:
# No items available (must wait)
if self._closed:
callback(Result(exc=QueueClosed())) # Error result
else:
self.waiting.append(lambda: self.get(callback))
class QueueClosed(Exception):
pass
def producer(q, count):
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n+1))
else:
print('Producer done')
q.close() # Means no more items will be produced
_run(0)
def consumer(q):
def _consume(result):
try:
item = result.result()
print('Consuming', item)
sched.call_soon(lambda: consumer(q))
except QueueClosed:
print("Consumer done")
q.get(callback=_consume)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q,))
sched.run()
# asynco.py
#
# A basic asynchronous scheduler with support for time
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0 # Used to break ties in priority queue
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler() # Behind scenes scheduler object
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4)
sched.call_later(4, lambda: countdown(n-1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x+1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
# coro_callback.py
#
# An example of how to implement coroutine based concurrency layered
# on top of a callback-based scheduler.
import time
from collections import deque
import heapq
# Callback based scheduler (from earlier)
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
# Coroutine-based functions
def new_task(self, coro):
self.ready.append(Task(coro)) # Wrapped coroutine
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch() # Switch to a new task
# Class that wraps a coroutine--making it look like a callback
class Task:
def __init__(self, coro):
self.coro = coro # "Wrapped coroutine"
# Make it look like a callback
def __call__(self):
try:
# Driving the coroutine as before
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler() # Background scheduler object
# ----------------
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
async def put(self, item):
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
if not self.items:
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
# Coroutine-based tasks
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
await q.put(None) # "Sentinel" to shut down
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
# Call-back based tasks
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4) # Blocking call (nothing else can run)
sched.call_later(4, lambda: countdown(n-1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x+1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
# Build Your Own Async
#
# David Beazley (@dabeaz)
# https://www.dabeaz.com
#
# Originally presented at PyCon India, Chennai, October 14, 2019
import time
def countdown(n):
while n > 0:
print('Down', n)
time.sleep(1)
n -= 1
def countup(stop):
x = 0
while x < stop:
print('Up', x)
time.sleep(1)
x += 1
# Example of sequential execution
countdown(5)
countup(5)
# Example of concurrent execution (via threads)
import threading
threading.Thread(target=countdown, args=(5,)).start()
threading.Thread(target=countup, args=(5,)).start()
# io_scheduler.py
#
# An example of implementing I/O operations in the scheduler
import time
from collections import deque
import heapq
from select import select
# Callback based scheduler (from earlier)
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = [] # Sleeping functions
self.sequence = 0
self._read_waiting = { }
self._write_waiting = { }
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def read_wait(self, fileno, func):
# Trigger func() when fileno is readable
self._read_waiting[fileno] = func
def write_wait(self, fileno, func):
# Trigger func() when fileno is writeable
self._write_waiting[fileno] = func
def run(self):
while (self.ready or self.sleeping or self._read_waiting or self._write_waiting):
if not self.ready:
# Find the nearest deadline
if self.sleeping:
deadline, _, func = self.sleeping[0]
timeout = deadline - time.time()
if timeout < 0:
timeout = 0
else:
timeout = None # Wait forever
# Wait for I/O (and sleep)
can_read, can_write, _ = select(self._read_waiting,
self._write_waiting, [], timeout)
for fd in can_read:
self.ready.append(self._read_waiting.pop(fd))
for fd in can_write:
self.ready.append(self._write_waiting.pop(fd))
# Check for sleeping tasks
now = time.time()
while self.sleeping:
if now > self.sleeping[0][0]:
self.ready.append(heapq.heappop(self.sleeping)[2])
else:
break
while self.ready:
func = self.ready.popleft()
func()
def new_task(self, coro):
self.ready.append(Task(coro)) # Wrapped coroutine
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch() # Switch to a new task
async def recv(self, sock, maxbytes):
self.read_wait(sock, self.current)
self.current = None
await switch()
return sock.recv(maxbytes)
async def send(self, sock, data):
self.write_wait(sock, self.current)
self.current = None
await switch()
return sock.send(data)
async def accept(self, sock):
self.read_wait(sock, self.current)
self.current = None
await switch()
return sock.accept()
class Task:
def __init__(self, coro):
self.coro = coro # "Wrapped coroutine"
# Make it look like a callback
def __call__(self):
try:
# Driving the coroutine as before
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler() # Background scheduler object
# ----------------
from socket import *
async def tcp_server(addr):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(1)
while True:
client, addr = await sched.accept(sock)
print('Connection from', addr)
sched.new_task(echo_handler(client))
async def echo_handler(sock):
while True:
data = await sched.recv(sock, 10000)
if not data:
break
await sched.send(sock, b'Got:' + data)
print('Connection closed')
sock.close()
sched.new_task(tcp_server(('', 30000)))
sched.run()
# producer.py
#
# Producer-consumer problem with threads
import queue
import threading
import time
def producer(q, count):
for n in range(count):
print('Producing', n)
q.put(n)
time.sleep(1)
print('Producer done')
q.put(None) # "Sentinel" to shut down
def consumer(q):
while True:
item = q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = queue.Queue() # Thread-safe queue
threading.Thread(target=producer, args=(q, 10)).start()
threading.Thread(target=consumer, args=(q,)).start()
# yieldo.py
#
# Example of a coroutine-based scheduler
import time
from collections import deque
import heapq
# Plumbing that makes the "await" statement work. We provide
# a single function "switch" that is used by the schedule to
# switch tasks.
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [ ]
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # "Disappear"
await switch() # Switch tasks
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
# Drive as a generator
try:
self.current.send(None) # Send to a coroutine
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler() # Background scheduler object
# ---- Example code
async def countdown(n):
while n > 0:
print('Down', n)
await sched.sleep(4)
n -= 1
async def countup(stop):
x = 0
while x < stop:
print('Up', x)
await sched.sleep(1)
x += 1
sched.new_task(countdown(5))
sched.new_task(countup(20))
sched.run()
# yproducer.py
#
# Coroutine producer-consumer problem. With async-await
import time
from collections import deque
import heapq
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [ ]
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # "Disappear"
await switch() # Switch tasks
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
# Drive as a generator
try:
self.current.send(None) # Send to a coroutine
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler() # Background scheduler object
# ----------------
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
async def put(self, item):
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
if not self.items:
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
await q.put(None) # "Sentinel" to shut down
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
sched.run()
# yproducer_error.py
#
# Example of error handling with coroutines
import time
from collections import deque
import heapq
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = [ ]
self.current = None # Currently executing generator
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # "Disappear"
await switch() # Switch tasks
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
# Drive as a generator
try:
self.current.send(None) # Send to a coroutine
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler() # Background scheduler object
# ----------------
class QueueClosed(Exception):
pass
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
self._closed = False
def close(self):
self._closed = True
if self.waiting and not self.items:
sched.ready.append(self.waiting.popleft()) # Reschedule waiting tasks
async def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
while not self.items:
if self._closed:
raise QueueClosed()
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
q.close()
async def consumer(q):
try:
while True:
item = await q.get()
print('Consuming', item)
except QueueClosed:
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
sched.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment