Skip to content

Instantly share code, notes, and snippets.

@Xophmeister
Last active July 12, 2017 11:21
Show Gist options
  • Select an option

  • Save Xophmeister/e5c5a51aa40e385529ccb8341f396b33 to your computer and use it in GitHub Desktop.

Select an option

Save Xophmeister/e5c5a51aa40e385529ccb8341f396b33 to your computer and use it in GitHub Desktop.
Minimal example of a Python event loop running in its own thread
"""
asyncio Demonstration
MIT License
Copyright (c) 2017 Genome Research Ltd.
"""
import asyncio
from threading import Lock, Thread
from typing import NamedTuple
import aiohttp
class URLResponse(NamedTuple):
""" URL response model """
status:int
headers:aiohttp.client.CIMultiDict
body:str
class URLFetcher(object):
"""
Run an asynchronous HTTP client on an event loop within its own
thread and expose synchronous and asynchronous GET methods
"""
_loop:asyncio.AbstractEventLoop
def __init__(self) -> None:
self._loop_lock = Lock()
# Start the event loop thread
self._thread = Thread(target=self._init_loop)
self._thread.start()
with self._loop_lock:
# Don't initialise the client session until the loop has
# been created in our daughter thread
self._session = aiohttp.ClientSession(loop=self._loop)
# We don't need the loop lock any more
del self._loop_lock
def _init_loop(self) -> None:
"""
Initialise and start the event loop within the policy context
(i.e., "thread", under the default policy)
@note Blocking
"""
with self._loop_lock:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def __enter__(self) -> "URLFetcher":
return self
def __exit__(self, *_exc) -> bool:
self.close()
return False
def __del__(self) -> None:
self.close()
def __call__(self, url:str) -> URLResponse:
# Convenience wrapper
return self.get(url)
async def get_async(self, url:str) -> URLResponse:
"""
Get the response body from a specified URL
@note Asynchronous
@param url URL (string)
@return Response (URLResponse)
"""
async with self._session.get(url) as response:
return URLResponse(response.status,
response.headers,
await response.text())
def get(self, url:str) -> URLResponse:
"""
Get the response body from a specified URL
@note Blocking
@param url URL (string)
@return Response (URLResponse)
"""
# FIXME? Doesn't the loop have its own method to submit a coroutine
future = asyncio.run_coroutine_threadsafe(self.get_async(url), self._loop)
return future.result()
def close(self) -> None:
"""
Graceful shutdown:
* Close HTTP client session
* Stop the event loop and block until the thread terminates
@note Blocking
"""
self._session.close()
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
if __name__ == "__main__":
with URLFetcher() as fetch:
# TODO Asynchronous fetch
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
# ???
loop.close()
# Synchronous REPL
while True:
x = input("URL> ")
if not x:
# Exit on empty input
break
try:
resp = fetch(x)
print(f"Status {resp.status}: {resp.headers['Content-Type']}")
print(resp.body)
except Exception as e:
print(f"{e.__class__.__name__}: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment