Skip to content

Instantly share code, notes, and snippets.

@wonderbeyond
Created November 28, 2022 06:39
Show Gist options
  • Save wonderbeyond/9379133e6eb44d4b15d7f82d4ac99201 to your computer and use it in GitHub Desktop.
Save wonderbeyond/9379133e6eb44d4b15d7f82d4ac99201 to your computer and use it in GitHub Desktop.
[asyncio] async 2 sync
from functools import wraps
import asyncio
def get_event_loop():
try:
return asyncio.get_event_loop()
except RuntimeError as e:
if "There is no current event loop in thread" in str(e):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
raise
def force_async(fn):
'''
turns a sync function to async function using threads
'''
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()
@wraps(fn)
def wrapper(*args, **kwargs):
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
def force_sync(fn):
'''
turn an async function to sync function
'''
@wraps(fn)
def wrapper(*args, **kwargs):
res = fn(*args, **kwargs)
return get_event_loop().run_until_complete(res) if asyncio.iscoroutine(res) else res
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment