-
-
Save 1st1/f110d5e2ade94e679c4442e9b6d117e1 to your computer and use it in GitHub Desktop.
import asyncio | |
import random | |
import time | |
async def worker(name, queue): | |
while True: | |
# Get a "work item" out of the queue. | |
sleep_for = await queue.get() | |
# Sleep for the "sleep_for" seconds. | |
await asyncio.sleep(sleep_for) | |
# Notify the queue that the "work item" has been processed. | |
queue.task_done() | |
print(f'{name} has slept for {sleep_for:.2f} seconds') | |
async def main(): | |
# Create a queue that we will use to store our "workload". | |
queue = asyncio.Queue() | |
# Generate random timings and put them into the queue. | |
total_sleep_time = 0 | |
for _ in range(20): | |
sleep_for = random.uniform(0.05, 1.0) | |
total_sleep_time += sleep_for | |
queue.put_nowait(sleep_for) | |
# Create three worker tasks to process the queue concurrently. | |
tasks = [] | |
for i in range(3): | |
task = asyncio.create_task(worker(f'worker-{i}', queue)) | |
tasks.append(task) | |
# Wait until the queue is fully processed. | |
started_at = time.monotonic() | |
await queue.join() | |
total_slept_for = time.monotonic() - started_at | |
# Cancel our worker tasks. | |
for task in tasks: | |
task.cancel() | |
print('====') | |
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') | |
print(f'total expected sleep time: {total_sleep_time:.2f} seconds') | |
asyncio.run(main()) |
👍
Thank you a lot!
This example helped me understand what a worker is.
This helped a lot!!
Thank you very much for helping
@1st1 Thanks for your help.
I changed your gist slightly to make it work with threading:
import asyncio
import random
import time
import threading
# https://gist.github.com/1st1/f110d5e2ade94e679c4442e9b6d117e1
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'Dequeue {name} for {sleep_for:.2f}')
def task_inqueue(queue):
while True:
sleep_for = random.uniform(0.05, 1.0)
print(f"inserting queue:{sleep_for}")
queue.put_nowait(sleep_for)
time.sleep(1)
async def task_dequeue(queue):
print("====> ENTERING DEQUEUE")
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
def main():
queue = asyncio.Queue()
t_inqueue = threading.Thread(target=task_inqueue, args=(queue, ))
t_inqueue.start()
time.sleep(3)
asyncio.run(task_dequeue(queue))
if __name__ == '__main__':
main()
However, it seems that the task task_dequeue()
doesn't work as expected, would you help to take a look at it ?
inserting queue:0.95193975849139
inserting queue:0.4126914655983662
inserting queue:0.06469289732759037
====> ENTERING DEQUEUE
inserting queue:0.6371992807835489
inserting queue:0.5150791863419759
inserting queue:0.6435704016735537
inserting queue:0.7021951747038464
inserting queue:0.10352214837552644
inserting queue:0.9473580907504813
inserting queue:0.4135447172976028
inserting queue:0.329603710993434
inserting queue:0.07332900743730483
inserting queue:0.517951602458065
inserting queue:0.050595876540664504
inserting queue:0.1502889820422032
inserting queue:0.1674049554774545
inserting queue:0.5752381577004789
inserting queue:0.39471607398543573
inserting queue:0.2616939369576392
inserting queue:0.2534322236566886
inserting queue:0.2338269996901821
inserting queue:0.41332410609141934
inserting queue:0.8967170698534741
inserting queue:0.9536775061167974
inserting queue:0.41487047235012803
inserting queue:0.8282921670779227
inserting queue:0.4875614601997763
inserting queue:0.5328441641772085
...
For traditional multi thread + queue, it would works like:
import threading
import queue
import random
import time
q = queue.Queue()
lock = threading.Lock()
def worker(q, index):
cnt = 0
while True:
cnt = cnt + 1
if not q.empty():
with lock:
print(f'Worker {index} consume {q.get()}, count:{cnt}')
time.sleep(random.randint(1, 20)/10)
else:
time.sleep(1)
def inqueue():
cnt = 0
while True:
cnt = cnt + 1
i = random.randint(1, 10001)
print(f"Enqueue {i}, count:{cnt}")
with lock:
q.put(i)
time.sleep(0.1)
if cnt % 100 == 0:
time.sleep(100)
def main():
t_inqueue = threading.Thread(target=inqueue, args=())
t_inqueue.start()
time.sleep(3)
for i in range(5):
t = threading.Thread(target=worker, args=(q, i))
t.start()
if __name__ == '__main__':
main()
great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.
edit: so this is literally just the example from the python documentation
great examples guys. this was simple and clear to understand. sadly, i don't think this is going to help speed up my networked API calls as they are just limited by network speed and the GIL, global interpreter lock, of python.
edit: so this is literally just the example from the python documentation
It's not just the example from the python docs. It's the example for the Python documentation. Jury is one of Python's core devs, and co-author of Asyncio, and author of the async/await syntax.
Thanks.. This helped me understand queues and tasks with asyncio