Skip to content

Instantly share code, notes, and snippets.

@devlights
Created November 21, 2018 08:57
Show Gist options
  • Save devlights/72a50ac554e264ede8edcd762d96bdbc to your computer and use it in GitHub Desktop.
Save devlights/72a50ac554e264ede8edcd762d96bdbc to your computer and use it in GitHub Desktop.
[python][asyncio] Producer-Consumerパターンのサンプル
"""
asyncio のサンプルです。
asyncio.Queueを使った producer-consumer パターンのサンプル。
"""
import asyncio
import contextlib
import itertools
import random
async def make_producer(queue: asyncio.Queue):
rnd = random.Random()
for i in itertools.count():
await asyncio.sleep(rnd.random())
item = f'Item {i}'
await queue.put(item)
print(f'[producer] put {item}')
async def make_consumer(queue: asyncio.Queue, producer: asyncio.Future):
rnd = random.Random()
while True:
if producer.cancelled() and queue.empty():
break
await asyncio.sleep(rnd.random())
with contextlib.suppress(asyncio.TimeoutError):
item = await asyncio.wait_for(queue.get(), timeout=1.0)
print(f'====> [consumer] get {item}')
def all_cancel():
for task in asyncio.Task.all_tasks(): # type: asyncio.Task
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
loop.run_until_complete(task)
print(f'task done: {task}')
if __name__ == '__main__':
loop = asyncio.get_event_loop() # type: asyncio.events.AbstractEventLoop
loop.set_debug(True)
queue = asyncio.Queue(maxsize=5, loop=loop)
producer = asyncio.ensure_future(make_producer(queue))
consumer = asyncio.ensure_future(make_consumer(queue, producer))
loop.call_later(10, loop.stop)
loop.run_forever()
# producer側をキャンセルして、consumerの終了を待機
producer.cancel()
print(f'CANCEL ==> producer')
for task in (producer, consumer):
with contextlib.suppress(asyncio.CancelledError):
loop.run_until_complete(task)
# 一気に全部キャンセルする場合
# all_cancel()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment