-
-
Save hsanchez/40f6ff825ebd188229c74b2e49f94624 to your computer and use it in GitHub Desktop.
asyncio queues example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
import time | |
async def worker(name, queue): | |
while True: | |
# Get a "work item" out of the queue. | |
sleep_for = await queue.get() | |
# Sleep for the "sleep_for" seconds. | |
await asyncio.sleep(sleep_for) | |
# Notify the queue that the "work item" has been processed. | |
queue.task_done() | |
print(f'{name} has slept for {sleep_for:.2f} seconds') | |
async def main(): | |
# Create a queue that we will use to store our "workload". | |
queue = asyncio.Queue() | |
# Generate random timings and put them into the queue. | |
total_sleep_time = 0 | |
for _ in range(20): | |
sleep_for = random.uniform(0.05, 1.0) | |
total_sleep_time += sleep_for | |
queue.put_nowait(sleep_for) | |
# Create three worker tasks to process the queue concurrently. | |
tasks = [] | |
for i in range(3): | |
task = asyncio.create_task(worker(f'worker-{i}', queue)) | |
tasks.append(task) | |
# Wait until the queue is fully processed. | |
started_at = time.monotonic() | |
await queue.join() | |
total_slept_for = time.monotonic() - started_at | |
# Cancel our worker tasks. | |
for task in tasks: | |
task.cancel() | |
print('====') | |
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') | |
print(f'total expected sleep time: {total_sleep_time:.2f} seconds') | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment