Last active
September 10, 2024 06:46
-
-
Save ShabbirHasan1/46a9095d891f16c1b74a3642543e535f to your computer and use it in GitHub Desktop.
aiosonic HTTP Client Usage Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations, absolute_import | |
import os, sys, platform, asyncio, signal, logging, contextlib | |
from ssl import SSLContext | |
from http import HTTPMethod | |
from datetime import datetime as dtdt | |
from pathlib import Path | |
from threading import Thread | |
from time import sleep | |
from logging.handlers import RotatingFileHandler | |
from aiosonic import HTTPClient | |
from aiosonic.timeout import Timeouts | |
from aiosonic.connectors import TCPConnector | |
from aiosonic.resolver import AsyncResolver | |
from aiosonic.types import DataType, ParamsType | |
from typing import Any, Coroutine, List, Dict, Tuple, Optional, NoReturn | |
if sys.platform.startswith("win"): | |
from signal import SIGABRT, SIGINT, SIGTERM | |
SIGNALS = (SIGABRT, SIGINT, SIGTERM) | |
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
else: | |
from signal import SIGABRT, SIGINT, SIGTERM, SIGHUP | |
SIGNALS = (SIGABRT, SIGINT, SIGTERM, SIGHUP) | |
__all__ = ["FetchData", "ProgramKilled"] | |
class ProgramKilled(Exception): | |
"""ProgramKilled Checks the ProgramKilled exception""" | |
pass # type: ignore | |
class FetchData: | |
LOGGING_FORMAT: str = "[%(levelname)s]|[%(asctime)s]|[%(name)s::%(module)s::%(funcName)s::%(lineno)d]|=> %(message)s" | |
USER_AGENT: str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.2210.91" | |
@staticmethod | |
def get_now_date_time_with_microseconds_string() -> str: | |
return dtdt.now().strftime("%d_%b_%Y_%H_%M_%S_%f") | |
@staticmethod | |
def is_windows() -> bool: | |
return ( | |
os.name == "nt" | |
and sys.platform == "win32" | |
and platform.system() == "Windows" | |
) | |
@staticmethod | |
def is_linux() -> bool: | |
return ( | |
os.name == "posix" | |
and platform.system() == "Linux" | |
and sys.platform in {"linux", "linux2"} | |
) | |
@staticmethod | |
def is_mac() -> bool: | |
return ( | |
os.name == "posix" | |
and sys.platform == "darwin" | |
and platform.system() == "Darwin" | |
) | |
@staticmethod | |
def get_logger(name, filename, level=logging.WARNING) -> logging.Logger: | |
logger = logging.getLogger(name) | |
logger.setLevel(level) | |
stream = logging.StreamHandler() | |
stream.setFormatter(logging.Formatter(FetchData.LOGGING_FORMAT)) | |
logger.addHandler(stream) | |
fh = RotatingFileHandler(filename, maxBytes=100 * 1024 * 1024, backupCount=25) | |
fh.setFormatter(logging.Formatter(FetchData.LOGGING_FORMAT)) | |
logger.addHandler(fh) | |
logger.propagate = False | |
return logger | |
@staticmethod | |
def start_background_loop(loop: asyncio.AbstractEventLoop) -> Optional[NoReturn]: | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_forever() | |
except (KeyboardInterrupt, SystemExit, ProgramKilled): | |
loop.run_until_complete(loop.shutdown_asyncgens()) | |
if loop.is_running(): | |
loop.stop() | |
if not loop.is_closed(): | |
loop.close() | |
@staticmethod | |
async def get_client( | |
verify_ssl: bool = True, handle_cookies: bool = True | |
) -> HTTPClient: | |
return HTTPClient( | |
connector=TCPConnector( | |
pool_size=25, | |
timeouts=Timeouts( | |
sock_connect=10.0, | |
sock_read=30.0, | |
pool_acquire=5.0, | |
request_timeout=60.0, | |
), | |
resolver=AsyncResolver(nameservers=["1.1.1.1", "1.0.0.1"]), | |
ttl_dns_cache=22_500, | |
use_dns_cache=True, | |
conn_max_requests=500, | |
), | |
handle_cookies=handle_cookies, | |
verify_ssl=verify_ssl, | |
) | |
def __init__( | |
self, | |
debug: bool = False, | |
verify_ssl: bool = True, | |
use_http2: bool = False, | |
follow_redirects: bool = True, | |
max_retries: int = 10, | |
req_res_timeout: float = 30.0, | |
) -> None: | |
self.debug = debug | |
self.max_retries = max_retries | |
self.__verify_ssl = verify_ssl | |
self.__use_http2 = use_http2 | |
self.__follow = follow_redirects | |
self.__timeout = req_res_timeout | |
if self.debug: | |
self.log_level = logging.DEBUG if self.debug else logging.INFO | |
else: | |
self.log_level = logging.WARNING | |
self.logfile = Path.cwd().joinpath( | |
f"logs/FetchData_{FetchData.get_now_date_time_with_microseconds_string()}.log" | |
) | |
os.makedirs(self.logfile.parent, exist_ok=True) | |
self.log = FetchData.get_logger( | |
"FetchData", filename=self.logfile, level=self.log_level | |
) | |
logging.basicConfig(format=FetchData.LOGGING_FORMAT, level=self.log_level) | |
logging.getLogger("aiosonic").propagate = True | |
logging.getLogger("aiosonic").setLevel(self.log_level) | |
self.__initialize_loop() | |
self._init_sessions() | |
async def __aenter__(self) -> "FetchData": | |
return self | |
async def __aexit__(self, exc_type: None, exc: None, tb: None) -> None: | |
self.__graceful_exit() | |
def __enter__(self) -> "FetchData": | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
self.__graceful_exit() | |
def __del__(self) -> None: | |
self.__graceful_exit() | |
def __delete__(self) -> None: | |
self.__graceful_exit() | |
def __graceful_exit(self) -> None: | |
self.log.info("Gracefully Exiting...") | |
try: | |
asyncio.run_coroutine_threadsafe( | |
self.__stop_req_sessions(), self.__loop | |
).result(1.0) | |
sleep(0.25) | |
asyncio.run_coroutine_threadsafe( | |
self.__loop.shutdown_asyncgens(), self.__loop | |
).result(1.0) | |
sleep(0.25) | |
if self.__loop.is_running(): | |
self.__loop.stop() | |
sleep(0.25) | |
if not self.__loop.is_closed(): | |
self.__loop.close() | |
sleep(0.25) | |
except (RuntimeError, RuntimeWarning) as err: | |
self.log.error(str(err)) | |
sys.exit(0) | |
def handle_stop_signals(self, *args, **kwargs) -> NoReturn: | |
self.log.info("Handling Stop Signal By Gracefully Exiting...") | |
try: | |
self.__graceful_exit() | |
except Exception as err: | |
self.log.error(str(err)) | |
self.log.info("Exiting Program...") | |
sys.exit(0) | |
def __initialize_loop(self) -> None: | |
self.__loop = asyncio.new_event_loop() | |
if FetchData.is_windows(): | |
with contextlib.suppress(ValueError): | |
for sig in SIGNALS: | |
signal.signal(sig, self.handle_stop_signals) | |
else: | |
with contextlib.suppress(ValueError): | |
for sig in SIGNALS: | |
self.__loop.add_signal_handler(sig, self.handle_stop_signals) | |
self._event_thread = Thread( | |
target=self.start_background_loop, | |
args=(self.__loop,), | |
name=f"{self.__class__.__name__}_event_thread", | |
daemon=True, | |
) | |
self._event_thread.start() | |
self.log.info("FetchData Event Loop has been initialized.") | |
async def __init_sessions(self) -> None: | |
self.client = await FetchData.get_client( | |
verify_ssl=self.__verify_ssl, handle_cookies=False | |
) | |
def _init_sessions(self) -> None: | |
return asyncio.run_coroutine_threadsafe( | |
self.__init_sessions(), | |
self.__loop, | |
).result(timeout=self.__timeout) | |
async def __stop_req_sessions(self) -> None: | |
if await self.client.wait_requests(timeout=5): | |
await self.client.connector.cleanup() | |
await self.client.__aexit__(None, None, None) | |
await asyncio.sleep(0.010) | |
del self.client | |
async def __run_multiple_tasks( | |
self, | |
coroutines: List[Coroutine[Any, Any, Any]], | |
maintain_order: bool = False, | |
) -> Optional[List[Optional[Dict[str, Any] | str | bytes | Any]]]: | |
if maintain_order: | |
results = [] | |
for coroutine in coroutines: | |
results.append(await coroutine) | |
await asyncio.sleep(0.0) | |
return results | |
else: | |
try: | |
async with asyncio.TaskGroup() as tg: | |
tasks = [tg.create_task(coroutine) for coroutine in coroutines] | |
except* TimeoutError as err: | |
self.log.exception( | |
"While Executing Multiple Tasks A Timeout Exception Has Occured, Cancelling Remaining Tasks, Expetion Details: %s", | |
err, | |
) | |
except* Exception as exc: | |
self.log.exception( | |
"While Executing Multiple Tasks Encountered An Exception, Cancelling Remaining Tasks, Expetion Details: %s", | |
exc, | |
) | |
else: | |
return [task.result() for task in tasks] | |
def _run_multiple_tasks( | |
self, | |
coroutines: List[Coroutine[Any, Any, Any]], | |
maintain_order: bool = False, | |
) -> Optional[List[Optional[Dict[str, Any] | str | bytes | Any]]]: | |
_timeout = self.__timeout * len(coroutines) | |
future = asyncio.run_coroutine_threadsafe( | |
self.__run_multiple_tasks(coroutines, maintain_order=maintain_order), | |
self.__loop, | |
) | |
try: | |
result = future.result(timeout=_timeout) | |
except TimeoutError: | |
self.log.error( | |
"The Request Took Longer Than The Default Timeout To Wait For The Response Along With Retries, i.e. %.2f Seconds, Cancelling The Task...", | |
_timeout, | |
) | |
future.cancel() | |
except Exception as exc: | |
self.log.exception(f"The Request Ended Up With An Exception: {exc!r}") | |
else: | |
return result | |
async def __request( | |
self, | |
url: str, | |
method: HTTPMethod, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
verify: bool = True, | |
ssl: Optional[SSLContext] = None, | |
timeouts: Optional[Timeouts] = None, | |
follow: bool = False, | |
http2: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
limit = asyncio.Semaphore(2) if limit is None else limit | |
url = FetchData.ROUTES[url] if url in FetchData.ROUTES else url | |
retry_no, status_code, response_text = 0, None, None | |
headers = self.__headers | |
async with limit: | |
while retry_no < self.max_retries: | |
if retry_no > 0: | |
self.log.debug("Retry No. %d", retry_no) | |
self.log.debug( | |
"Initializing Request For Endpoint: %s, Method: %s, With Request Params: %s, Headers: %s, Data: %s, Json: %s", | |
url, | |
method, | |
params, | |
headers, | |
data, | |
json, | |
) | |
try: | |
response = await self.client._request_with_body( | |
url, | |
method.value, | |
data=data, | |
headers=headers, | |
json=json, | |
params=params, | |
multipart=multipart, | |
verify=verify, | |
ssl=ssl, | |
timeouts=timeouts, | |
follow=follow, | |
http2=http2, | |
) | |
response_text = await response.text() | |
status_code = response.status_code | |
except Exception as err: | |
self.log.error( | |
"Request For Endpoint: %s, Method: %s, With Request Params: %s, Data: %s, Json: %s; Has FAILED with Status Code: %s and Response Text: %s...Truncted To 500 Chars, Exception was: %s", | |
url, | |
method, | |
params, | |
data, | |
json, | |
status_code, | |
response_text[:500] if response_text is not None else "", | |
err, | |
) | |
self.log.debug("Going to sleep for 1 second and then retry...") | |
await asyncio.sleep(1.0) | |
retry_no += 1 | |
else: | |
if response.ok: | |
self.log.info( | |
"Request For Endpoint: %s, Method: %s, With Request Params: %s, Data: %s, Json: %s; Has SUCCEDED with Status Code: %s and Response Text: %s...Truncted To 500 Chars.", | |
url, | |
method, | |
params, | |
data, | |
json, | |
status_code, | |
response_text[:500], | |
) | |
try: | |
return await response.json() | |
except Exception: | |
return response_text | |
else: | |
self.log.error( | |
"Request For Endpoint: %s, Method: %s, With Request Params: %s, Data: %s, Json: %s; Has FAILED with Status Code: %s and Response Text: %s...Truncted To 500 Chars", | |
url, | |
method, | |
params, | |
data, | |
json, | |
status_code, | |
response_text[:500] if response_text is not None else "", | |
) | |
self.log.debug("Going to sleep for 1 second and then retry...") | |
await asyncio.sleep(1.0) | |
retry_no += 1 | |
def _request( | |
self, | |
url: str, | |
method: HTTPMethod, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
verify: bool = True, | |
ssl: Optional[SSLContext] = None, | |
timeouts: Optional[Timeouts] = None, | |
follow: bool = False, | |
http2: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
future = asyncio.run_coroutine_threadsafe( | |
self.__request( | |
url, | |
method, | |
data=data, | |
json=json, | |
params=params, | |
multipart=multipart, | |
verify=verify, | |
ssl=ssl, | |
timeouts=timeouts, | |
follow=follow, | |
http2=http2, | |
sclient=sclient, | |
limit=limit, | |
), | |
self.__loop, | |
) | |
try: | |
result = future.result(timeout=self.__timeout) | |
except TimeoutError: | |
self.log.error( | |
"The Request Took Longer Than The Default Timeout To Wait For The Response Along With Retries, i.e. %.2f Seconds, Cancelling The Task...", | |
self.__timeout, | |
) | |
future.cancel() | |
except Exception as exc: | |
self.log.exception(f"The Request Ended Up With An Exception: {exc!r}") | |
else: | |
return result | |
async def __get( | |
self, | |
url: str, | |
params: Optional[ParamsType] = None, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return await self.__request( | |
url, | |
HTTPMethod.GET, | |
params=params, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
def _get( | |
self, | |
url: str, | |
params: Optional[ParamsType] = None, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return self._request( | |
url, | |
HTTPMethod.GET, | |
params=params, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
async def __put( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return await self.__request( | |
url, | |
HTTPMethod.PUT, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
def _put( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return self._request( | |
url, | |
HTTPMethod.PUT, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
async def __post( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return await self.__request( | |
url, | |
HTTPMethod.POST, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
def _post( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return self._request( | |
url, | |
HTTPMethod.POST, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
async def __patch( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return await self.__request( | |
url, | |
HTTPMethod.PATCH, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
def _patch( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return self._request( | |
url, | |
HTTPMethod.PATCH, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
async def __delete( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return await self.__request( | |
url, | |
HTTPMethod.DELETE, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) | |
def _delete( | |
self, | |
url: str, | |
data: Optional[DataType] = None, | |
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None, | |
params: Optional[ParamsType] = None, | |
multipart: bool = False, | |
sclient: bool = False, | |
limit: Optional[asyncio.Semaphore] = None, | |
) -> Optional[Dict[str, Any] | str | bytes | Any]: | |
return self._request( | |
url, | |
HTTPMethod.DELETE, | |
params=params, | |
data=data, | |
multipart=multipart, | |
follow=self.__follow, | |
http2=self.__use_http2, | |
sclient=sclient, | |
limit=limit, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment