Last active
July 15, 2024 07:11
-
-
Save t41372/b1068432ac7f1f0624f9f1f03ceb7e7e to your computer and use it in GitHub Desktop.
Producer-consumer model in python with asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
import time | |
# emulate a time consuming task | |
async def task_A(char): | |
time_to_sleep = random.randint(1, 3) | |
id = f'Product_{char}[{time.strftime("%M:%S", time.localtime())}, {time_to_sleep}s]' | |
print("Producing: " + id) | |
await asyncio.to_thread( | |
time.sleep, time_to_sleep | |
) # a non-async task that takes 1-3 seconds | |
return id | |
# play the result of task_A | |
async def play_result_A(result_A): | |
print( | |
f"At {time.strftime('%M:%S', time.localtime())}, Start Playing: {result_A} for 4s" | |
) | |
await asyncio.to_thread(time.sleep, 4) # takes 4 seconds to play | |
print(f"At {time.strftime('%M:%S', time.localtime())} Finish_Playing: {result_A}") | |
async def main(chat_completions): | |
async def workerA(queue): | |
for char in chat_completions: | |
result = await task_A(char) | |
await queue.put(result) | |
async def player(queue): | |
while True: | |
result = await queue.get() | |
await play_result_A(result) | |
queue.task_done() | |
queue = asyncio.Queue() | |
producer = asyncio.create_task(workerA(queue)) | |
consumer = asyncio.create_task(player(queue)) | |
await producer | |
await queue.join() # wait until all tasks are done | |
consumer.cancel() | |
# usage | |
if __name__ == "__main__": | |
chat_completions = iter("ABCDEFGHIJ") # emulate chat_completions iterator | |
asyncio.run(main(chat_completions)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Console output: