Last active
March 31, 2025 13:43
-
Star
(129)
You must be signed in to star a gist -
Fork
(18)
You must be signed in to fork a gist
-
-
Save dmfigol/3e7d5b84a16d076df02baa9f53271058 to your computer and use it in GitHub Desktop.
Python asyncio event loop in a separate thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This gist shows how to run asyncio loop in a separate thread. | |
It could be useful if you want to mix sync and async code together. | |
Python 3.7+ | |
""" | |
import asyncio | |
from datetime import datetime | |
from threading import Thread | |
from typing import Tuple, List, Iterable | |
import httpx | |
URLS = [ | |
"https://pypi.org", | |
"https://python.org", | |
"https://google.com", | |
"https://amazon.com", | |
"https://reddit.com", | |
"https://stackoverflow.com", | |
"https://ubuntu.com", | |
"https://github.com", | |
"https://microsoft.com", | |
] | |
def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: | |
asyncio.set_event_loop(loop) | |
loop.run_forever() | |
async def fetch(url: str) -> Tuple[str, int]: | |
"""Does HTTP get on url and returns url and status code""" | |
async with httpx.AsyncClient() as session: | |
response = await session.get(url) | |
return url, response.status_code | |
async def fetch_all_urls(urls: Iterable[str]) -> List[Tuple[str, int]]: | |
"""Fetch all urls from the list of urls | |
It is done concurrently and combined into a single coroutine""" | |
tasks = [asyncio.create_task(fetch(url)) for url in urls] | |
results = await asyncio.gather(*tasks) | |
return results | |
def main() -> None: | |
loop = asyncio.new_event_loop() | |
t = Thread(target=start_background_loop, args=(loop,), daemon=True) | |
t.start() | |
start_time = datetime.now() | |
task = asyncio.run_coroutine_threadsafe(fetch_all_urls(URLS), loop) | |
for url, status_code in task.result(): | |
print(f"{url} -> {status_code}") | |
exec_time = (datetime.now() - start_time).total_seconds() | |
print(f"It took {exec_time:,.2f} seconds to run") | |
loop.stop() | |
if __name__ == "__main__": | |
main() |
I wrote some helper functions for this:
from typing import Coroutine
import asyncio
from asyncio import Future, AbstractEventLoop
from threading import Thread
def create_event_loop_thread() -> AbstractEventLoop:
"""
From https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058
"""
def start_background_loop(loop: AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
eventloop = asyncio.new_event_loop()
thread = Thread(target=start_background_loop, args=(eventloop,), daemon=True)
thread.start()
return eventloop
def run_coroutine_in_thread(coro: Coroutine) -> Future:
"""
From https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058
"""
return asyncio.run_coroutine_threadsafe(coro, self.eventloop)
Thank you so much! It really works on the first try. I have been looking for this solution all this time, now my bug is solved. Thank you!!
Saved my life 😯
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks so much! I was trying to get this to work for 3 days, and when I finally found this, it worked first try!