Last active
July 12, 2017 11:21
-
-
Save Xophmeister/e5c5a51aa40e385529ccb8341f396b33 to your computer and use it in GitHub Desktop.
Minimal example of a Python event loop running in its own thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| asyncio Demonstration | |
| MIT License | |
| Copyright (c) 2017 Genome Research Ltd. | |
| """ | |
| import asyncio | |
| from threading import Lock, Thread | |
| from typing import NamedTuple | |
| import aiohttp | |
| class URLResponse(NamedTuple): | |
| """ URL response model """ | |
| status:int | |
| headers:aiohttp.client.CIMultiDict | |
| body:str | |
| class URLFetcher(object): | |
| """ | |
| Run an asynchronous HTTP client on an event loop within its own | |
| thread and expose synchronous and asynchronous GET methods | |
| """ | |
| _loop:asyncio.AbstractEventLoop | |
| def __init__(self) -> None: | |
| self._loop_lock = Lock() | |
| # Start the event loop thread | |
| self._thread = Thread(target=self._init_loop) | |
| self._thread.start() | |
| with self._loop_lock: | |
| # Don't initialise the client session until the loop has | |
| # been created in our daughter thread | |
| self._session = aiohttp.ClientSession(loop=self._loop) | |
| # We don't need the loop lock any more | |
| del self._loop_lock | |
| def _init_loop(self) -> None: | |
| """ | |
| Initialise and start the event loop within the policy context | |
| (i.e., "thread", under the default policy) | |
| @note Blocking | |
| """ | |
| with self._loop_lock: | |
| self._loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(self._loop) | |
| self._loop.run_forever() | |
| def __enter__(self) -> "URLFetcher": | |
| return self | |
| def __exit__(self, *_exc) -> bool: | |
| self.close() | |
| return False | |
| def __del__(self) -> None: | |
| self.close() | |
| def __call__(self, url:str) -> URLResponse: | |
| # Convenience wrapper | |
| return self.get(url) | |
| async def get_async(self, url:str) -> URLResponse: | |
| """ | |
| Get the response body from a specified URL | |
| @note Asynchronous | |
| @param url URL (string) | |
| @return Response (URLResponse) | |
| """ | |
| async with self._session.get(url) as response: | |
| return URLResponse(response.status, | |
| response.headers, | |
| await response.text()) | |
| def get(self, url:str) -> URLResponse: | |
| """ | |
| Get the response body from a specified URL | |
| @note Blocking | |
| @param url URL (string) | |
| @return Response (URLResponse) | |
| """ | |
| # FIXME? Doesn't the loop have its own method to submit a coroutine | |
| future = asyncio.run_coroutine_threadsafe(self.get_async(url), self._loop) | |
| return future.result() | |
| def close(self) -> None: | |
| """ | |
| Graceful shutdown: | |
| * Close HTTP client session | |
| * Stop the event loop and block until the thread terminates | |
| @note Blocking | |
| """ | |
| self._session.close() | |
| self._loop.call_soon_threadsafe(self._loop.stop) | |
| self._thread.join() | |
| if __name__ == "__main__": | |
| with URLFetcher() as fetch: | |
| # TODO Asynchronous fetch | |
| loop = asyncio.get_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # ??? | |
| loop.close() | |
| # Synchronous REPL | |
| while True: | |
| x = input("URL> ") | |
| if not x: | |
| # Exit on empty input | |
| break | |
| try: | |
| resp = fetch(x) | |
| print(f"Status {resp.status}: {resp.headers['Content-Type']}") | |
| print(resp.body) | |
| except Exception as e: | |
| print(f"{e.__class__.__name__}: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment