Skip to content

Instantly share code, notes, and snippets.

@lsevero
Forked from ddelange/executor.py
Created September 23, 2021 02:38
Show Gist options
  • Save lsevero/2e63179a4e1a832b16e3e1227234e6ee to your computer and use it in GitHub Desktop.
Save lsevero/2e63179a4e1a832b16e3e1227234e6ee to your computer and use it in GitHub Desktop.
Make a sync function async
import asyncio
from functools import wraps, partial
def run_in_executor(fn=None, *, executor=None):
"""Make a sync function async. By default uses ThreadPoolExecutor."""
if fn is None:
# allow using this decorator with brackets, e.g.
# @run_in_executor(executor=ThreadPoolExecutor(1))
return partial(run_in_executor, executor=executor)
@wraps(fn)
async def wrapped(*args, **kwargs):
"""Wrap function in a run_in_executor."""
_fn = partial(fn, *args, **kwargs)
if hasattr(executor, "coro_apply"):
# support aioprocessing.pool.AioPool
fut = executor.coro_apply(_fn)
else:
fut = asyncio.get_running_loop().run_in_executor(executor, _fn)
return await fut
return wrapped
# without brackets
@run_in_executor
def test1():
print(1)
await test1()
# with brackets
@run_in_executor()
def test2():
print(2)
await test2()
# with explicit ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
@run_in_executor(executor=ThreadPoolExecutor(4))
def test3():
print(3)
await test3()
# with explicit AioPool, patched using multiprocess (uses dill for universal pickling)
from aioprocessing.pool import AioPool
from multiprocess import Pool
setattr(AioPool, "delegate", Pool)
@run_in_executor(executor=AioPool(4))
def test4():
print(4)
await test4()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment