Skip to content

Instantly share code, notes, and snippets.

@Suor
Last active January 19, 2024 16:19
Show Gist options
  • Save Suor/b1bea32395bbc371973b40abddf79fac to your computer and use it in GitHub Desktop.
Save Suor/b1bea32395bbc371973b40abddf79fac to your computer and use it in GitHub Desktop.
A way to enforce cantok on any coroutine
import asyncio
class AbstractToken:
async def wait(self, is_async=True): # A placeholder for the real thing
await asyncio.sleep(0.2)
return
async def wrap(self, task):
future = asyncio.ensure_future(task)
limiter = asyncio.ensure_future(self._make_limiter(future))
await asyncio.wait([future, limiter], return_when=asyncio.FIRST_COMPLETED)
if future.cancelled():
raise self.exception
limiter.cancel()
return future.result()
async def _make_limiter(self, future):
await self.wait(is_async=True) # Should be .wait_async() IMHO
future.cancel()
async def some_task():
await asyncio.sleep(1)
return 42
async def main():
tok = AbstractToken()
print(await tok.wrap(some_task()))
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment