Skip to content

Instantly share code, notes, and snippets.

@jerryan999
Last active October 1, 2021 00:31
Show Gist options
  • Save jerryan999/212dbd0a0bb2c3c564e67d084357690f to your computer and use it in GitHub Desktop.
Save jerryan999/212dbd0a0bb2c3c564e67d084357690f to your computer and use it in GitHub Desktop.
Implement Producer/Consumer pattern with asyncio.Queue
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class AsyncioHelper(object):
'''
一个生产者,多个消费者模型
'''
def __init__(self, func, num_worker=4):
self.tasks = [
loop.create_task(self.worker(idx, func)) for idx in range(num_worker)
]
# fix for gcloud function
asyncio.set_event_loop(loop)
self.queue = asyncio.Queue()
def send_task_nowait(self, item):
self.queue.put_nowait(item)
print(f'added task: to the queue, qsize:{self.queue.qsize()}')
async def _run(self):
await self.queue.join()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
def run(self):
loop.run_until_complete(self._run())
async def worker(self, index, func):
while True:
item = await self.queue.get()
print(f'Consumed-{index}, value: , size:{self.queue.qsize()}')
await func(item)
self.queue.task_done()
if __name__ == "__main__":
# only for test
async def handle_task(item):
await asyncio.sleep(3)
print(f"Finished processing task:str({item})")
h = AsyncioHelper(handle_task)
for i in range(20):
h.send_task_nowait(i)
h.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment