Created
November 14, 2019 18:15
-
-
Save pruthvi6767/b0bad4894110042e3c768e9ee69fa7a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import abc | |
import sys | |
import urllib3 | |
from http.client import HTTPSConnection | |
import json | |
import asyncio | |
import time | |
import logging | |
import aiohttp.client as async_http_client | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
conn = urllib3.PoolManager(2) | |
logs = logging.getLogger() | |
logs.level = logging.DEBUG | |
logs.addHandler(logging.StreamHandler(stream=sys.stdout)) | |
class MyError(Exception): | |
def __init__(self, msg): | |
self.msg = msg | |
super() | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
people = [] | |
# connection pool | |
# Timeouts been discarded and wait for complete responses | |
conn = urllib3.PoolManager(20) | |
async def get_ships_and_planets_for_person(person): | |
print("calling ships/person") | |
if person["starships"]: | |
# person["starships"] = await asyncio.gather([starship["name"] | |
# for starship in list(map(get_starwars_data, person['starships']))]) | |
person["starships"] = await get_urls(person["starships"]) | |
if person["homeworld"]: | |
home = await get_starwars_data(person["homeworld"]) | |
person["homeworld"] = home["name"] | |
return {"name": person["name"], "homeworld": person["homeworld"], | |
"starships": person["starships"] | |
} | |
async def get_starwars_data(url): | |
try: | |
async with async_http_client.ClientSession() as session: | |
async with session.get(url) as response: | |
if response.status == 200: | |
return await response.json() | |
else: | |
raise MyError(msg="error-custom") | |
# await response | |
# if response.done() and response.result().status == 200: | |
# data=json.loads(response.data.decode('utf-8')) | |
# return data | |
except Exception as e: | |
print("Error reading from {} ".format(url)) | |
logs.log(logging.INFO, e) | |
async def main(start_url): | |
while start_url: | |
# people_response = asyncio.create_task(get_starwars_data(start_url)) | |
# with ThreadPoolExecutor | |
# people_futures = asyncio.run_coroutine_threadsafe( | |
# get_starwars_data(start_url)) | |
# for | |
# for people in people_futures: | |
# if people. | |
# if people_response.done(): | |
# people.extend( | |
# list(await asyncio.gather(map(get_ships_and_planets_for_person, | |
# people_response.result()['results'])))) | |
# map_people_task = asyncio.create_task(map(get_ships_and_planets_for_person, | |
# people_response.result()['results'])) | |
# await map_people_task | |
# print(people_response.result()) | |
response = conn.request('GET', start_url, headers={ | |
"Content-Type": "Application/JSON"}, timeout=60) | |
if response.status == 200: | |
people_response = json.loads(response.data.decode('utf-8')) | |
for person in people_response['results']: | |
tasks = [] | |
person_task = asyncio.create_task( | |
get_ships_and_planets_for_person(person)) | |
person_task.add_done_callback(extend_people_result) | |
tasks.append(person_task) | |
await asyncio.gather(*tasks) | |
if people_response['next'] is not None: | |
start_url = people_response['next'] | |
else: | |
print(time.process_time()) | |
print(people) | |
print(len(people)) | |
sys.exit(0) | |
def extend_people_result(person): | |
people.extend(person) | |
async def get_urls(urls) -> list: | |
for i, uri in enumerate(urls): | |
urls[i] = await get_starwars_data(uri) | |
return urls | |
if __name__ == '__main__': | |
start_url = 'https://swapi.co/api/people/' | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main(start_url)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment