Last active
October 12, 2023 11:33
-
-
Save leplatrem/2165f81da1b54001c4a6d47ea390b6f9 to your computer and use it in GitHub Desktop.
Batched producer/consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import async_timeout | |
import concurrent.futures | |
import random | |
import time | |
async def produce(queue, n): | |
for x in range(n): | |
# produce an item | |
print('producing {}/{}'.format(x, n)) | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
item = str(x) | |
# put the item in the queue | |
await queue.put(item) | |
async def consume(loop, queue, executor): | |
def markdone(queue, n): | |
return lambda fut: [queue.task_done() for _ in range(n)] | |
while True: | |
# wait for an item from the producer | |
items = [] | |
try: | |
with async_timeout.timeout(2): | |
while len(items) < 5: | |
item = await queue.get() | |
print('consuming {}...'.format(item)) | |
items.append(item) | |
except asyncio.TimeoutError: | |
print("Producer done or not fast enough, proceed.") | |
pass | |
if items: | |
print('→ batch {}...'.format(len(items))) | |
task = loop.run_in_executor(executor, time.sleep, random.random()) | |
task.add_done_callback(markdone(queue, len(items))) | |
async def run(loop, n): | |
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) | |
queue = asyncio.Queue() | |
# schedule the consumer | |
consumer_coro = consume(loop, queue, executor) | |
consumer = asyncio.ensure_future(consumer_coro) | |
# run the producer and wait for completion | |
await produce(queue, n) | |
# wait until the consumer has processed all items | |
await queue.join() | |
# the consumer is still awaiting for an item, cancel it | |
consumer.cancel() | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run(loop, 13)) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment