Skip to content

Instantly share code, notes, and snippets.

@ygkn
Last active October 7, 2021 10:28
Show Gist options
  • Save ygkn/644e7d12793ecebc7ed2b506c2dd7707 to your computer and use it in GitHub Desktop.
Save ygkn/644e7d12793ecebc7ed2b506c2dd7707 to your computer and use it in GitHub Desktop.
import asyncio
"""
hello
hello
hello
bye
"""
interval_event = asyncio.Event()
exit_event = asyncio.Event()
async def wait_event():
await interval_event.wait()
interval_event.clear()
async def listen_event():
while True:
await wait_event()
print("hello")
async def interval():
for _ in range(3):
await asyncio.sleep(1)
interval_event.set()
await exit_event.wait()
async def not_exit():
await asyncio.gather(listen_event(), interval())
async def wait_exit():
await asyncio.sleep(20)
print("bye")
async def main():
_, pending = await asyncio.wait(
map(asyncio.create_task, (not_exit(), wait_exit())),
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
"""
hello
hello
hello
"""
async def interval_event():
for _ in range(3):
yield "hello"
await asyncio.sleep(1)
async def listen_event():
async for _ in interval_event():
print("hello")
async def not_exit():
await asyncio.gather(listen_event())
async def wait_exit():
await asyncio.sleep(20)
print("bye")
async def main():
_, pending = await asyncio.wait(
map(asyncio.create_task, (not_exit(), wait_exit())),
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
@ygkn
Copy link
Author

ygkn commented Oct 7, 2021

hello
hello
hello
bye

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment