Skip to content

Instantly share code, notes, and snippets.

@kanzure
Created November 22, 2024 15:32
Show Gist options
  • Save kanzure/54e75042a9fb0e4c9503e24220ec073c to your computer and use it in GitHub Desktop.
Save kanzure/54e75042a9fb0e4c9503e24220ec073c to your computer and use it in GitHub Desktop.
python asyncio.wait_for timeout parameter is for total time, not function execution time. Is there a standard library way to wait_for with a timeout that times the awaitable instead of the overall python process?
import time
import asyncio
EXPECTED_INNOCENT_COROUTINE_DURATION = 5 # seconds
INNOCENT_COROUTINE_TIMEOUT = EXPECTED_INNOCENT_COROUTINE_DURATION + 10 # seconds
INNOCENT_COROUTINE_TOTAL_TIME = 0
async def do_nasty_work():
print("Starting a nasty, busy coroutine")
await asyncio.sleep(0)
while True:
# do real computational work, no waiting for IO here
print("nasty coroutine - doing work")
time.sleep(1)
# yield to the event loop
await asyncio.sleep(0.1)
async def innocent_coroutine():
global INNOCENT_COROUTINE_TOTAL_TIME
increment = 0.1
while INNOCENT_COROUTINE_TOTAL_TIME < EXPECTED_INNOCENT_COROUTINE_DURATION:
# "do stuff" and then yield
print("innocent coroutine: yay I'm in my runloop!")
# do stuff
time.sleep(increment)
INNOCENT_COROUTINE_TOTAL_TIME += increment
print(f"innocent coroutine: did a little bit of work for {increment} seconds")
# yield to the event loop
await asyncio.sleep(0.1)
print(f"innocent_coroutine: Done working! Took {INNOCENT_COROUTINE_TOTAL_TIME} seconds")
return True
async def innocent_coroutine_wrapper():
start_time = time.time()
print(f"innocent_coroutine_wrapper: Started @ {start_time}")
try:
return await asyncio.wait_for(innocent_coroutine(), timeout=INNOCENT_COROUTINE_TIMEOUT)
except asyncio.TimeoutError:
# :(
print(f"ERROR: innocent_coroutine_wrapper: Timeout @ {time.time()}, ran for {time.time() - start_time} seconds")
# :(
return False
async def main():
# start our innocent coroutine with a wait_for timeout
innocent_task = asyncio.create_task(innocent_coroutine_wrapper())
# simulate a scenario where other busy coroutines are busy doing work
nasty_tasks = []
for _ in range(3): # try 20 for a high-contention server
nasty_task = asyncio.create_task(do_nasty_work())
nasty_tasks.append(nasty_task)
while True:
if innocent_task.done() and innocent_task.result() == False and INNOCENT_COROUTINE_TOTAL_TIME < EXPECTED_INNOCENT_COROUTINE_DURATION:
print(f"ERROR: innocent coroutine was canceled early, total function time was {INNOCENT_COROUTINE_TOTAL_TIME} seconds")
break
await asyncio.sleep(0)
# cancel all nasty tasks
for task in nasty_tasks:
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment