Last active
October 1, 2021 00:31
-
-
Save jerryan999/212dbd0a0bb2c3c564e67d084357690f to your computer and use it in GitHub Desktop.
Implement Producer/Consumer pattern with asyncio.Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
try: | |
loop = asyncio.get_event_loop() | |
except RuntimeError as ex: | |
if "There is no current event loop in thread" in str(ex): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
class AsyncioHelper(object): | |
''' | |
一个生产者,多个消费者模型 | |
''' | |
def __init__(self, func, num_worker=4): | |
self.tasks = [ | |
loop.create_task(self.worker(idx, func)) for idx in range(num_worker) | |
] | |
# fix for gcloud function | |
asyncio.set_event_loop(loop) | |
self.queue = asyncio.Queue() | |
def send_task_nowait(self, item): | |
self.queue.put_nowait(item) | |
print(f'added task: to the queue, qsize:{self.queue.qsize()}') | |
async def _run(self): | |
await self.queue.join() | |
for task in self.tasks: | |
task.cancel() | |
await asyncio.gather(*self.tasks, return_exceptions=True) | |
def run(self): | |
loop.run_until_complete(self._run()) | |
async def worker(self, index, func): | |
while True: | |
item = await self.queue.get() | |
print(f'Consumed-{index}, value: , size:{self.queue.qsize()}') | |
await func(item) | |
self.queue.task_done() | |
if __name__ == "__main__": | |
# only for test | |
async def handle_task(item): | |
await asyncio.sleep(3) | |
print(f"Finished processing task:str({item})") | |
h = AsyncioHelper(handle_task) | |
for i in range(20): | |
h.send_task_nowait(i) | |
h.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment