Skip to content

Instantly share code, notes, and snippets.

@vsmelov
Created September 24, 2018 09:04
Show Gist options
  • Save vsmelov/76da4a744ec4d60b0138e4eb077fc8bc to your computer and use it in GitHub Desktop.
Save vsmelov/76da4a744ec4d60b0138e4eb077fc8bc to your computer and use it in GitHub Desktop.
async def long_coro():
await asyncio.sleep(1000)
async def fail_coro():
raise ValueError
async def timeout_coro():
print('start')
await asyncio.wait_for(long_coro(), timeout=3)
async def wait_for_queue():
queue = asyncio.Queue()
await queue.get()
class TrickyClass:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def main1(): # fail immediatelly
await asyncio.gather(*[
long_coro(),
fail_coro(),
])
async def main2(): # fail in 3sec
await asyncio.gather(*[
long_coro(),
timeout_coro(),
])
async def main3(): # fail immediatelly
await asyncio.gather(*[
fail_coro(),
wait_for_queue(),
])
async def main4(): # fail in 3sec
await asyncio.gather(*[
timeout_coro(),
wait_for_queue(),
])
async def main5(): # fail in 3sec
async with TrickyClass() as obj:
await asyncio.gather(*[
timeout_coro(),
wait_for_queue(),
])
import asyncio
# asyncio.get_event_loop().run_until_complete(main1())
# asyncio.get_event_loop().run_until_complete(main2())
# asyncio.get_event_loop().run_until_complete(main3())
# asyncio.get_event_loop().run_until_complete(main4())
asyncio.get_event_loop().run_until_complete(main5())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment