Skip to content

Instantly share code, notes, and snippets.

@1st1
Last active October 20, 2024 19:56
Show Gist options
  • Save 1st1/f110d5e2ade94e679c4442e9b6d117e1 to your computer and use it in GitHub Desktop.
Save 1st1/f110d5e2ade94e679c4442e9b6d117e1 to your computer and use it in GitHub Desktop.
asyncio queues example
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
@snavruzov
Copy link

👍

@kunansy
Copy link

kunansy commented Dec 24, 2020

Thank you a lot!
This example helped me understand what a worker is.

@prabhatkgupta
Copy link

This helped a lot!!
Thank you very much for helping

@impvd
Copy link

impvd commented Mar 26, 2021

@1st1 Thanks for your help.

I changed your gist slightly to make it work with threading:

import asyncio
import random
import time
import threading


# https://gist.github.com/1st1/f110d5e2ade94e679c4442e9b6d117e1
async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'Dequeue {name} for {sleep_for:.2f}')


def task_inqueue(queue):
    while True:
        sleep_for = random.uniform(0.05, 1.0)
        print(f"inserting queue:{sleep_for}")
        queue.put_nowait(sleep_for)
        time.sleep(1)


async def task_dequeue(queue):
    print("====> ENTERING DEQUEUE")
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)


def main():
    queue = asyncio.Queue()
    t_inqueue = threading.Thread(target=task_inqueue, args=(queue, ))
    t_inqueue.start()
    time.sleep(3)
    asyncio.run(task_dequeue(queue))


if __name__ == '__main__':
    main()

However, it seems that the task task_dequeue() doesn't work as expected, would you help to take a look at it ?

inserting queue:0.95193975849139
inserting queue:0.4126914655983662
inserting queue:0.06469289732759037
====> ENTERING DEQUEUE
inserting queue:0.6371992807835489
inserting queue:0.5150791863419759
inserting queue:0.6435704016735537
inserting queue:0.7021951747038464
inserting queue:0.10352214837552644
inserting queue:0.9473580907504813
inserting queue:0.4135447172976028
inserting queue:0.329603710993434
inserting queue:0.07332900743730483
inserting queue:0.517951602458065
inserting queue:0.050595876540664504
inserting queue:0.1502889820422032
inserting queue:0.1674049554774545
inserting queue:0.5752381577004789
inserting queue:0.39471607398543573
inserting queue:0.2616939369576392
inserting queue:0.2534322236566886
inserting queue:0.2338269996901821
inserting queue:0.41332410609141934
inserting queue:0.8967170698534741
inserting queue:0.9536775061167974
inserting queue:0.41487047235012803
inserting queue:0.8282921670779227
inserting queue:0.4875614601997763
inserting queue:0.5328441641772085
...

@impvd
Copy link

impvd commented Mar 26, 2021

For traditional multi thread + queue, it would works like:

import threading
import queue
import random
import time

q = queue.Queue()
lock = threading.Lock()


def worker(q, index):
    cnt = 0
    while True:
        cnt = cnt + 1
        if not q.empty():
            with lock:                
                print(f'Worker {index} consume {q.get()}, count:{cnt}')
            time.sleep(random.randint(1, 20)/10)
        else:
            time.sleep(1)


def inqueue():
    cnt = 0
    while True:
        cnt = cnt + 1
        i = random.randint(1, 10001)
        print(f"Enqueue {i}, count:{cnt}")
        with lock:
            q.put(i)
        time.sleep(0.1)
        if cnt % 100 == 0:
            time.sleep(100)


def main():
    t_inqueue = threading.Thread(target=inqueue, args=())
    t_inqueue.start()
    time.sleep(3)
    for i in range(5):
        t = threading.Thread(target=worker, args=(q, i))
        t.start()


if __name__ == '__main__':
    main()

@androslee
Copy link

androslee commented Nov 19, 2022

great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.

edit: so this is literally just the example from the python documentation

https://docs.python.org/3/library/asyncio-queue.html

@zaemiel
Copy link

zaemiel commented Dec 27, 2022

great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.

edit: so this is literally just the example from the python documentation

https://docs.python.org/3/library/asyncio-queue.html

It's not just the example from the python docs. It's the example for the Python documentation. Jury is one of Python's core devs, and co-author of Asyncio, and author of the async/await syntax.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment