Skip to content

Instantly share code, notes, and snippets.

@jsbueno
Last active February 20, 2026 15:47
Show Gist options
  • Select an option

  • Save jsbueno/a51b96e7344babedf5860b86a64d24bd to your computer and use it in GitHub Desktop.

Select an option

Save jsbueno/a51b96e7344babedf5860b86a64d24bd to your computer and use it in GitHub Desktop.
exploring exception handling and resuming with asyncio.
import random, asyncio, time, os
async def a(n):
global last_task
last_task = asyncio.current_task()
while True:
print(n)
await asyncio.sleep(0.5 + random.random() / 3)
async def main(tasks, n):
loop = asyncio.get_running_loop()
loop.set_exception_handler(xis)
tasks.update({asyncio.create_task(a(i)) for i in range(n)})
done = "nenhuma"
old_tasks = tasks.copy()
tasks.clear()
tasks.update(t for t in old_tasks if not t.cancelled())
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("xi. tentaram cancelar uma aqui")
manager(4)
def manager(n):
tasks = set()
loop = asyncio.new_event_loop()
while True:
try:
loop.run_until_complete(main(tasks, n))
except KeyboardInterrupt:
print(f"fora do loop. Tasks: {tasks}")
last_task.cancel()
print(tasks)
old_tasks = tasks.copy()
tasks.clear()
tasks.update(t for t in old_tasks if not t.cancelled())
if not tasks:
# cabo as task
break
print(f"{len(tasks)} restantes!")
time.sleep(1)
n = 0
manager(5)
import signal, asyncio
async def a():
print("starting")
await asyncio.sleep(3)
print("finishing")
def handler(loop):
print(f"ctrl + c detected in task in {loop}")
def main():
loop = asyncio.new_event_loop()
loop.add_signal_handler(signal.SIGINT, handler, loop)
loop.run_until_complete(a())
loop.close()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment