Last active
April 6, 2017 18:14
-
-
Save rohansb/8fa125ec34d23a319e647993507d327c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Helper module for spawning multiple asynchronous http requests using co-routines & futures, | |
powered by `asyncio` & `aiohttp` libraries, compatible only with `python 3.5 and above` | |
referring - | |
a task: as a individual unit of work, that awaits on response from a http request | |
a job: as a combined unit of all the tasks, that gathers all responses together | |
""" | |
import asyncio | |
import logging | |
import time | |
from abc import abstractmethod | |
import aiohttp | |
from werkzeug.exceptions import HTTPException | |
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
logger.setLevel(level='DEBUG') | |
class AsyncHttpRequestsBase(object): | |
""" | |
Class that spawns a batch of Asynchronous requests for a job, specifying a list of n/w bound urls | |
""" | |
def __init__(self, urls_list=None): | |
self.urls_list = urls_list | |
self.job_response = [] | |
@property | |
def urls_count(self): | |
return len(self.urls_list) | |
@staticmethod | |
async def get_task(index_url, url): | |
""" | |
Async procedure that does an io bound call and awaits response | |
:param index_url: process id or task id for which the async call is being done | |
:param url: url for which the async call is being done | |
:return: response from a task request | |
""" | |
# note execution start time | |
start = time.time() | |
logger.info('started task: {} '.format(index_url)) | |
aio_response = None | |
try: | |
# io bound GET over http that ensures a future response using await | |
aio_response = await aiohttp.request('GET', url) | |
# TODO: you could process the response here, as soon as it is available | |
# self.process_task_response(aio_response) | |
# json_response = await aio_response.json() | |
except HTTPException as e: | |
logger.debug('External server error: ' + str(e)) | |
else: | |
# tidbit: `try-else` clause is executed when the try clause does not raise an exception | |
# anything that was awaited, close out in the end in else clause | |
aio_response.close() | |
logger.info('finished task: {} , process took: {} '.format(index_url, time.time() - start)) | |
return aio_response | |
async def wait_all_get_tasks(self): | |
""" | |
Method that awaits on all the tasks to complete | |
sets self.job_response attribute upon completion | |
""" | |
# note when execution started for all the processes | |
start_all = time.time() | |
# create a list of sub-tasks where each sub-task using the URLS in itself is an async procedure | |
waited_tasks = [ | |
asyncio.ensure_future(self.get_task(index_url=i+1, url=self.urls_list[i])) | |
for i in range(self.urls_count) | |
] | |
# wait on all async tasks that returns a list of async sub-tasks | |
# assign all the responses to self objects job_response | |
self.job_response, _ = await asyncio.wait(waited_tasks) # return a tuple, we need only the result | |
logger.info('finished all tasks!! Entire process took: {} '.format(time.time() - start_all)) | |
def process_task_response(self, aio_response): | |
# TODO: implement a method (async) to process individual url response | |
pass | |
@abstractmethod | |
def process_job_response(self, get_raw=False): | |
""" | |
An abstract method (sync) that should be overridden by derived classes, expected to implement how all of the | |
responses would be processed | |
Recommended: to handle exceptions while processing the response in derived classes | |
:param get_raw: return the raw response without processing, in which case you process response outside the class | |
e.g. | |
response_obj = self.job_response | |
if get_raw: | |
:return response_obj | |
# let's fetch the Date from response header | |
processed_response = response_obj.headers.get('Date') | |
return processed_response | |
""" | |
raise NotImplementedError | |
def run_job(self): | |
if not self.urls_count: | |
logger.debug('Attribute self.urls_list is empty') | |
return 'URLs list is empty' | |
# create an event loop for the job | |
io_event_loop = asyncio.get_event_loop() | |
# run & wait until all tasks finish | |
io_event_loop.run_until_complete(future=self.wait_all_get_tasks()) | |
# process the response | |
io_event_loop.run_until_complete(future=self.process_job_response()) | |
# close the event loop | |
io_event_loop.close() | |
# return a processed response from method to be implemented by derived classes | |
return None | |
class AsyncRequestsDatatank(AsyncHttpRequestsBase): | |
async def process_job_response(self, get_raw=False): | |
if not self.job_response: | |
logger.debug('Job response empty, nothing to be processed') | |
return None | |
# upon successful execution, self.job_response returns a set of asyncio.Task objects (futures) | |
# https://docs.python.org/3/library/asyncio-task.html#asyncio.Task | |
try: | |
for task in self.job_response: | |
# task.result() returns object of type aiohttp.ClientResponse | |
# http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse | |
task_result = task.result() | |
# logger.info('task_result : ' + str(task_result)) | |
task_result_generator = await task_result.json(content_type='application/json') | |
logger.info('task_result_generator : ' + str(task_result_generator)) | |
except aiohttp.client_exceptions.ClientConnectionError as e: | |
logger.debug('Failed to process the job response: ' + str(e)) | |
if __name__ == '__main__': | |
url_list = [ | |
'https://jsonplaceholder.typicode.com/todos/1', | |
'https://jsonplaceholder.typicode.com/todos/2', | |
'https://jsonplaceholder.typicode.com/todos/3', | |
'https://jsonplaceholder.typicode.com/todos/4', | |
'https://jsonplaceholder.typicode.com/todos/5', | |
'https://jsonplaceholder.typicode.com/todos/6', | |
'https://jsonplaceholder.typicode.com/todos/7', | |
'https://jsonplaceholder.typicode.com/todos/8', | |
] | |
job1 = AsyncRequestsDatatank(urls_list=url_list) | |
job1.run_job() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment