Skip to content

Instantly share code, notes, and snippets.

@1st1
Last active October 20, 2024 19:56
Show Gist options
  • Save 1st1/f110d5e2ade94e679c4442e9b6d117e1 to your computer and use it in GitHub Desktop.
Save 1st1/f110d5e2ade94e679c4442e9b6d117e1 to your computer and use it in GitHub Desktop.
asyncio queues example
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
@impvd
Copy link

impvd commented Mar 26, 2021

For traditional multi thread + queue, it would works like:

import threading
import queue
import random
import time

q = queue.Queue()
lock = threading.Lock()


def worker(q, index):
    cnt = 0
    while True:
        cnt = cnt + 1
        if not q.empty():
            with lock:                
                print(f'Worker {index} consume {q.get()}, count:{cnt}')
            time.sleep(random.randint(1, 20)/10)
        else:
            time.sleep(1)


def inqueue():
    cnt = 0
    while True:
        cnt = cnt + 1
        i = random.randint(1, 10001)
        print(f"Enqueue {i}, count:{cnt}")
        with lock:
            q.put(i)
        time.sleep(0.1)
        if cnt % 100 == 0:
            time.sleep(100)


def main():
    t_inqueue = threading.Thread(target=inqueue, args=())
    t_inqueue.start()
    time.sleep(3)
    for i in range(5):
        t = threading.Thread(target=worker, args=(q, i))
        t.start()


if __name__ == '__main__':
    main()

@androslee
Copy link

androslee commented Nov 19, 2022

great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.

edit: so this is literally just the example from the python documentation

https://docs.python.org/3/library/asyncio-queue.html

@zaemiel
Copy link

zaemiel commented Dec 27, 2022

great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.

edit: so this is literally just the example from the python documentation

https://docs.python.org/3/library/asyncio-queue.html

It's not just the example from the python docs. It's the example for the Python documentation. Jury is one of Python's core devs, and co-author of Asyncio, and author of the async/await syntax.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment