Skip to content

Instantly share code, notes, and snippets.

@pavan538
Created July 15, 2019 07:11
Show Gist options
  • Save pavan538/334a04b838a891a9ff7f0b311d109311 to your computer and use it in GitHub Desktop.
Save pavan538/334a04b838a891a9ff7f0b311d109311 to your computer and use it in GitHub Desktop.
Asynchronous aiohttp responses.
import asyncio # only here for debugging purposes
import aiohttp
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:61.0) Gecko/20100101 Firefox/61.1'
def default_headers():
header = {
'User-Agent': USER_AGENT
}
return header
class WebSession(object):
session = None
@classmethod
def create(cls):
cls.session = aiohttp.ClientSession()
return cls.session
@classmethod
def close(cls):
if cls.session is not None:
# apparently this is supposed to return a future?
return cls.session.close()
async def request(method, url, **kwargs):
if kwargs.get('headers', None) is None:
kwargs['headers'] = default_headers()
if WebSession.session is None:
session = WebSession.create()
else:
session = WebSession.session
return await session.request(method=method, url=url, **kwargs)
async def get(url, **kwargs):
return await request('GET', url=url, **kwargs)
async def post(url, **kwargs):
return await request('POST', url=url, **kwargs)
async def get_url():
res = await get('https://httpbin.org/get')
print(f'Headers: {res.headers}')
async def close():
# run this before the app ends
await WebSession.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment