Last active
July 30, 2023 23:50
-
-
Save darkerego/ddbb0bbb6ccfed81704d11cf275a3c34 to your computer and use it in GitHub Desktop.
AioHttp Client Skeleton Class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import aiohttp | |
class AsyncHttpClient: | |
def __init__(self, _headers=None, base_url: str = None, timeout: (int, float) = 60): | |
""" | |
A skeletal asynchronous HTTP class. Because I found myself writing this same code | |
hundreds of times, I decided to just write a reusable module. | |
:param _headers: | |
:param base_url: | |
:param timeout: | |
""" | |
if headers is None: | |
_headers = {} | |
self.timeout_secs: (int, float) = timeout | |
self.base_url: str = base_url | |
self._session: (aiohttp.ClientSession, None) = None | |
self.is_a_initialized: bool = False | |
self.global_headers: dict = _headers | |
async def __ainit__(self): | |
""" | |
async __init__ , this must be awaited to create the client session | |
:return: | |
""" | |
timeout = aiohttp.ClientTimeout(total=self.timeout_secs, connect=(self.timeout_secs / 3), | |
sock_connect=(self.timeout_secs / 3), sock_read=(self.timeout_secs / 3)) | |
self._session: aiohttp.ClientSession = aiohttp.ClientSession(headers=self.global_headers, | |
base_url=self.base_url, | |
timeout=timeout) | |
self.is_a_initialized = True | |
async def parse_response(self, response: aiohttp.ClientResponse) -> (int, (dict, bytes)): | |
""" | |
:param response: client response object | |
:return: status code, (either json dict, OR bytes if the content was not json) | |
""" | |
status = response.status | |
if status == 200: | |
try: | |
resp = await response.json(content_type=None) | |
except json.JSONDecodeError: | |
resp = await response.read() | |
else: | |
resp = await response.read() | |
return status, resp | |
async def post(self, path: str, data=None, verify_ssl: bool = False) -> (int, (dict, bytes)): | |
""" | |
HTTP post request | |
:param path: URL | |
:param data: payload | |
:param verify_ssl: ignore ssl warnings | |
:return: status, resp | |
""" | |
if data is None: | |
data = {} | |
session: aiohttp.ClientSession = self._session | |
async with session.post(url=path, json=data, verify_ssl=verify_ssl) as response: | |
return await self.parse_response(response) | |
async def get(self, path: str, params=None, verify_ssl: bool = False) -> (int, (dict, bytes)): | |
""" | |
HTTP GET request | |
:param path: URL | |
:param params: query parameters (ie ?¶m=value) | |
:param verify_ssl: bool | |
:return: status, resp | |
""" | |
if params is None: | |
params = {} | |
async with self._session.get(url=path, params=params, verify_ssl=verify_ssl) as response: | |
return await self.parse_response(response) | |
async def request(self, method: str, *args, **kwargs): | |
""" | |
wrapper function for making requests | |
:param method: get, or post | |
:param args: arguments | |
:param kwargs: keyword arguments | |
:return: status, resp | |
""" | |
resp = '' | |
status = 0 | |
if hasattr(self, method.lower()): | |
fn = getattr(self, method.lower()) | |
coro = fn(*args, **kwargs) | |
try: | |
# resp = self.session.get(url, verify=False) | |
status, resp = await coro | |
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError, | |
aiohttp.ClientOSError) as err: | |
print('[!] HTTP Request error %s' % err) | |
except asyncio.exceptions.TimeoutError: | |
print('[!] Timed out ... ') | |
except aiohttp.ClientConnectorSSLError as err: | |
print('[!] ssl error %s with: ' % err) | |
except ValueError as err: | |
print('[!] invalid ?: %s' % err) | |
finally: | |
return status, resp | |
async def demo(self, url_path: str, method: str = 'get'): | |
""" | |
Just do an HTTP get | |
:param url_path: URL | |
:param method: | |
:return: | |
""" | |
if not self.is_a_initialized: | |
await self.__ainit__() | |
s, ret = await self.request(method, path=url_path) | |
await self._session.close() | |
if s == 200: | |
if type(ret) is bytes: | |
return ret.decode() | |
return ret | |
if __name__ == '__main__': | |
headers = {'content-type': 'application/json'} | |
http = AsyncHttpClient(_headers=headers) | |
print(asyncio.run(http.demo('https://ipecho.net/plain', 'get'))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment