Skip to content

Instantly share code, notes, and snippets.

@devlights
Last active April 25, 2020 00:54
Show Gist options
  • Select an option

  • Save devlights/0d4baba7155f769d433930fee59b7132 to your computer and use it in GitHub Desktop.

Select an option

Save devlights/0d4baba7155f769d433930fee59b7132 to your computer and use it in GitHub Desktop.
[python][asyncio] 基本的な使い方サンプル
"""
asyncio のサンプル
"""
import asyncio
import random
from datetime import datetime
from functools import partial
async def async_helloworld():
print('start::helloworld')
await asyncio.sleep(2)
print('end::helloworld')
def display_date(end_time: float, ev_loop: asyncio.events.AbstractEventLoop):
print(datetime.now())
if (ev_loop.time() + 1.0) < end_time:
ev_loop.call_later(1, display_date, end_time, ev_loop)
else:
ev_loop.stop()
async def slow_operation(future: asyncio.Future):
print('start::slow_operation')
await asyncio.sleep(3)
future.set_result('future is done.')
def got_result(ev_loop, future):
print(future.result())
ev_loop.stop()
async def random_sleep(rnd: random.Random, name: str) -> str:
sleep_seconds = rnd.randint(1, 10)
for i in range(2):
print(f'[{name}-{i+1}] random value: {sleep_seconds}')
await asyncio.sleep(sleep_seconds)
return f'{name} is done'
async def wait_got_result(return_when=asyncio.ALL_COMPLETED) -> tuple:
rnd = random.Random()
coros = [random_sleep(rnd, f'wgr{i}') for i in range(5)]
done, pending = await asyncio.wait(coros, return_when=return_when)
return done, pending
if __name__ == '__main__':
# まず、最初にイベントループの取得
loop = asyncio.get_event_loop() # type: asyncio.events.AbstractEventLoop
# 必要であればデバッグモードの設定
loop.set_debug(True)
# 最も基本的なパターン
loop.run_until_complete(async_helloworld())
# call_soonを使ったパターン
loop.call_soon(print, 'helloworld')
loop.call_soon(loop.stop)
loop.run_forever()
# call_laterとcall_soonを使ったパターン
end_time = loop.time() + 3.0
loop.call_soon(display_date, end_time, loop)
loop.run_forever()
# Futureを自前で作って操作するパターン
future = loop.create_future() # type: asyncio.Future
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
# Futureとadd_done_callbackとrun_foreverを使ったパターン
future = loop.create_future() # type: asyncio.Future
future.add_done_callback(partial(got_result, loop))
asyncio.ensure_future(slow_operation(future))
loop.run_forever()
# 複数のタスクを同時進行
# asyncio.gatherは指定したCoroutine or Futureの結果を一つにまとめたFutureを返す
# 結果はgatherに指定したCoroutine or Futureの順で設定される
rnd = random.Random() # type: random.Random
coros = (random_sleep(rnd, f'proc{i+1}') for i in range(5))
gather_future = asyncio.gather(*coros)
loop.run_until_complete(gather_future)
print(gather_future.result())
# 複数のタスクを同時進行して待ち合わせ
# asyncio.waitは指定したCoroutine or Futureの完了を待機するFutureを返す (タイムリミットの指定も可能)
# waitの引数 return_when にモードを指定することで、どのタイミングで結果を返すのかを制御できる
# 以下は、return_when=ALL_COMPLETED を指定した場合。つまり全てが完了するまで待機。
# (ALL_COMPLETEDがデフォルトの値)
done, pending = loop.run_until_complete(wait_got_result())
print(f'done={len(done)}, pending={len(pending)}')
# waitでreturn_when=FIRST_COMPLETEDを指定した場合
# この場合、完了していないタスクも存在するので、それの処置が必要となる
done, pending = loop.run_until_complete(
wait_got_result(return_when=asyncio.FIRST_COMPLETED)
)
print(f'done={len(done)}, pending={len(pending)}')
if pending:
# waitの時点で完了しなかったタスクを処理
loop.run_until_complete(asyncio.gather(*pending))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment