Skip to content

Instantly share code, notes, and snippets.

@tzechienchu
Created January 4, 2023 13:00
Show Gist options
  • Save tzechienchu/32196074e3ed57b4a565da1b6d8cffce to your computer and use it in GitHub Desktop.
Save tzechienchu/32196074e3ed57b4a565da1b6d8cffce to your computer and use it in GitHub Desktop.
Turn async to sync
import functools
def force_async(fn):
'''
turns a sync function to async function using threads
'''
from concurrent.futures import ThreadPoolExecutor
import asyncio
pool = ThreadPoolExecutor()
@functools.wraps(fn)
def wrapper(*args, **kwargs):
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
def force_sync(fn):
'''
turn an async function to sync function
'''
import asyncio
@functools.wraps(fn)
def wrapper(*args, **kwargs):
res = fn(*args, **kwargs)
if asyncio.iscoroutine(res):
return asyncio.get_event_loop().run_until_complete(res)
return res
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment