Created
March 3, 2024 19:32
-
-
Save arthur-tacca/3512c5105b2ef05ed6b1a1be73a0e8f1 to your computer and use it in GitHub Desktop.
Illustration of missed cleanup in an asyncio task group
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Illustration of missed cleanup in an asyncio task group due to the way it | |
# cancels a task that has not started by never running it at all. See issue: | |
# https://github.com/python/cpython/issues/116048 | |
# | |
# When run on my computer, I usually get the output: | |
# | |
# Total connections created: 3300 | |
# Connections left unclosed: 0 | |
# | |
# i.e., all connections that were created were cleaned up. But sometimes I get: | |
# | |
# Total connections created: 3300 | |
# Connections left unclosed: 300 | |
import asyncio | |
open_conns = set() | |
conn_count = 0 | |
class Connection: | |
def __init__(self, param): | |
self._param = param | |
open_conns.add(param) | |
global conn_count | |
conn_count += 1 | |
async def use(self): | |
await asyncio.sleep(1) | |
if self._param == 150: | |
raise RuntimeError(11) | |
def close(self): | |
open_conns.remove(self._param) | |
async def use_connection(conn): | |
try: | |
await conn.use() | |
finally: | |
conn.close() | |
async def process_data(item_iter): | |
async with asyncio.TaskGroup() as tg: | |
async for item in item_iter: | |
# I believe this bug could happen when only a single task is started | |
# between awaits. In practice, this is what triggered it on my | |
# machine (but I didn't try hard to come up with a simpler case). | |
conn = Connection(item-0.1) | |
tg.create_task(use_connection(conn)) | |
conn = Connection(item) | |
tg.create_task(use_connection(conn)) | |
conn = Connection(item+0.5) | |
tg.create_task(use_connection(conn)) | |
async def items(): | |
for i in range(100000): | |
if i % 100 == 0: | |
await asyncio.sleep(0.1) | |
yield i | |
async def main(): | |
item_iter = items() | |
try: | |
await process_data(item_iter) | |
except* RuntimeError as e: | |
print(f"Caught RuntimeError: {e!r}") | |
except* BaseException as be: | |
print(f"Caught BaseError: {be!r}") | |
print("Total connections created:", conn_count) | |
print("Connections left unclosed:", len(open_conns)) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment