Skip to content

Instantly share code, notes, and snippets.

@victory-sokolov
Created April 21, 2025 08:37
Show Gist options
  • Save victory-sokolov/f29920aed820db2bfdf14606fd08e3c9 to your computer and use it in GitHub Desktop.
Save victory-sokolov/f29920aed820db2bfdf14606fd08e3c9 to your computer and use it in GitHub Desktop.
Run synch code in the executor
def async_wrap(
loop: Optional[asyncio.BaseEventLoop] = None, executor: Optional[Executor] = None
) -> Callable:
def _async_wrap(func: Callable) -> Callable:
@wraps(func)
async def run(*args, loop=loop, executor=executor, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
return _async_wrap
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment