Created
September 24, 2018 09:04
-
-
Save vsmelov/76da4a744ec4d60b0138e4eb077fc8bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def long_coro(): | |
await asyncio.sleep(1000) | |
async def fail_coro(): | |
raise ValueError | |
async def timeout_coro(): | |
print('start') | |
await asyncio.wait_for(long_coro(), timeout=3) | |
async def wait_for_queue(): | |
queue = asyncio.Queue() | |
await queue.get() | |
class TrickyClass: | |
async def __aenter__(self): | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
pass | |
async def main1(): # fail immediatelly | |
await asyncio.gather(*[ | |
long_coro(), | |
fail_coro(), | |
]) | |
async def main2(): # fail in 3sec | |
await asyncio.gather(*[ | |
long_coro(), | |
timeout_coro(), | |
]) | |
async def main3(): # fail immediatelly | |
await asyncio.gather(*[ | |
fail_coro(), | |
wait_for_queue(), | |
]) | |
async def main4(): # fail in 3sec | |
await asyncio.gather(*[ | |
timeout_coro(), | |
wait_for_queue(), | |
]) | |
async def main5(): # fail in 3sec | |
async with TrickyClass() as obj: | |
await asyncio.gather(*[ | |
timeout_coro(), | |
wait_for_queue(), | |
]) | |
import asyncio | |
# asyncio.get_event_loop().run_until_complete(main1()) | |
# asyncio.get_event_loop().run_until_complete(main2()) | |
# asyncio.get_event_loop().run_until_complete(main3()) | |
# asyncio.get_event_loop().run_until_complete(main4()) | |
asyncio.get_event_loop().run_until_complete(main5()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment