Skip to content

Instantly share code, notes, and snippets.

@ShabbirHasan1
Last active September 10, 2024 06:46
Show Gist options
  • Save ShabbirHasan1/46a9095d891f16c1b74a3642543e535f to your computer and use it in GitHub Desktop.
Save ShabbirHasan1/46a9095d891f16c1b74a3642543e535f to your computer and use it in GitHub Desktop.
aiosonic HTTP Client Usage Example
from __future__ import annotations, absolute_import
import os, sys, platform, asyncio, signal, logging, contextlib
from ssl import SSLContext
from http import HTTPMethod
from datetime import datetime as dtdt
from pathlib import Path
from threading import Thread
from time import sleep
from logging.handlers import RotatingFileHandler
from aiosonic import HTTPClient
from aiosonic.timeout import Timeouts
from aiosonic.connectors import TCPConnector
from aiosonic.resolver import AsyncResolver
from aiosonic.types import DataType, ParamsType
from typing import Any, Coroutine, List, Dict, Tuple, Optional, NoReturn
if sys.platform.startswith("win"):
from signal import SIGABRT, SIGINT, SIGTERM
SIGNALS = (SIGABRT, SIGINT, SIGTERM)
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
from signal import SIGABRT, SIGINT, SIGTERM, SIGHUP
SIGNALS = (SIGABRT, SIGINT, SIGTERM, SIGHUP)
__all__ = ["FetchData", "ProgramKilled"]
class ProgramKilled(Exception):
"""ProgramKilled Checks the ProgramKilled exception"""
pass # type: ignore
class FetchData:
LOGGING_FORMAT: str = "[%(levelname)s]|[%(asctime)s]|[%(name)s::%(module)s::%(funcName)s::%(lineno)d]|=> %(message)s"
USER_AGENT: str = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.2210.91"
@staticmethod
def get_now_date_time_with_microseconds_string() -> str:
return dtdt.now().strftime("%d_%b_%Y_%H_%M_%S_%f")
@staticmethod
def is_windows() -> bool:
return (
os.name == "nt"
and sys.platform == "win32"
and platform.system() == "Windows"
)
@staticmethod
def is_linux() -> bool:
return (
os.name == "posix"
and platform.system() == "Linux"
and sys.platform in {"linux", "linux2"}
)
@staticmethod
def is_mac() -> bool:
return (
os.name == "posix"
and sys.platform == "darwin"
and platform.system() == "Darwin"
)
@staticmethod
def get_logger(name, filename, level=logging.WARNING) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(level)
stream = logging.StreamHandler()
stream.setFormatter(logging.Formatter(FetchData.LOGGING_FORMAT))
logger.addHandler(stream)
fh = RotatingFileHandler(filename, maxBytes=100 * 1024 * 1024, backupCount=25)
fh.setFormatter(logging.Formatter(FetchData.LOGGING_FORMAT))
logger.addHandler(fh)
logger.propagate = False
return logger
@staticmethod
def start_background_loop(loop: asyncio.AbstractEventLoop) -> Optional[NoReturn]:
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit, ProgramKilled):
loop.run_until_complete(loop.shutdown_asyncgens())
if loop.is_running():
loop.stop()
if not loop.is_closed():
loop.close()
@staticmethod
async def get_client(
verify_ssl: bool = True, handle_cookies: bool = True
) -> HTTPClient:
return HTTPClient(
connector=TCPConnector(
pool_size=25,
timeouts=Timeouts(
sock_connect=10.0,
sock_read=30.0,
pool_acquire=5.0,
request_timeout=60.0,
),
resolver=AsyncResolver(nameservers=["1.1.1.1", "1.0.0.1"]),
ttl_dns_cache=22_500,
use_dns_cache=True,
conn_max_requests=500,
),
handle_cookies=handle_cookies,
verify_ssl=verify_ssl,
)
def __init__(
self,
debug: bool = False,
verify_ssl: bool = True,
use_http2: bool = False,
follow_redirects: bool = True,
max_retries: int = 10,
req_res_timeout: float = 30.0,
) -> None:
self.debug = debug
self.max_retries = max_retries
self.__verify_ssl = verify_ssl
self.__use_http2 = use_http2
self.__follow = follow_redirects
self.__timeout = req_res_timeout
if self.debug:
self.log_level = logging.DEBUG if self.debug else logging.INFO
else:
self.log_level = logging.WARNING
self.logfile = Path.cwd().joinpath(
f"logs/FetchData_{FetchData.get_now_date_time_with_microseconds_string()}.log"
)
os.makedirs(self.logfile.parent, exist_ok=True)
self.log = FetchData.get_logger(
"FetchData", filename=self.logfile, level=self.log_level
)
logging.basicConfig(format=FetchData.LOGGING_FORMAT, level=self.log_level)
logging.getLogger("aiosonic").propagate = True
logging.getLogger("aiosonic").setLevel(self.log_level)
self.__initialize_loop()
self._init_sessions()
async def __aenter__(self) -> "FetchData":
return self
async def __aexit__(self, exc_type: None, exc: None, tb: None) -> None:
self.__graceful_exit()
def __enter__(self) -> "FetchData":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.__graceful_exit()
def __del__(self) -> None:
self.__graceful_exit()
def __delete__(self) -> None:
self.__graceful_exit()
def __graceful_exit(self) -> None:
self.log.info("Gracefully Exiting...")
try:
asyncio.run_coroutine_threadsafe(
self.__stop_req_sessions(), self.__loop
).result(1.0)
sleep(0.25)
asyncio.run_coroutine_threadsafe(
self.__loop.shutdown_asyncgens(), self.__loop
).result(1.0)
sleep(0.25)
if self.__loop.is_running():
self.__loop.stop()
sleep(0.25)
if not self.__loop.is_closed():
self.__loop.close()
sleep(0.25)
except (RuntimeError, RuntimeWarning) as err:
self.log.error(str(err))
sys.exit(0)
def handle_stop_signals(self, *args, **kwargs) -> NoReturn:
self.log.info("Handling Stop Signal By Gracefully Exiting...")
try:
self.__graceful_exit()
except Exception as err:
self.log.error(str(err))
self.log.info("Exiting Program...")
sys.exit(0)
def __initialize_loop(self) -> None:
self.__loop = asyncio.new_event_loop()
if FetchData.is_windows():
with contextlib.suppress(ValueError):
for sig in SIGNALS:
signal.signal(sig, self.handle_stop_signals)
else:
with contextlib.suppress(ValueError):
for sig in SIGNALS:
self.__loop.add_signal_handler(sig, self.handle_stop_signals)
self._event_thread = Thread(
target=self.start_background_loop,
args=(self.__loop,),
name=f"{self.__class__.__name__}_event_thread",
daemon=True,
)
self._event_thread.start()
self.log.info("FetchData Event Loop has been initialized.")
async def __init_sessions(self) -> None:
self.client = await FetchData.get_client(
verify_ssl=self.__verify_ssl, handle_cookies=False
)
def _init_sessions(self) -> None:
return asyncio.run_coroutine_threadsafe(
self.__init_sessions(),
self.__loop,
).result(timeout=self.__timeout)
async def __stop_req_sessions(self) -> None:
if await self.client.wait_requests(timeout=5):
await self.client.connector.cleanup()
await self.client.__aexit__(None, None, None)
await asyncio.sleep(0.010)
del self.client
async def __run_multiple_tasks(
self,
coroutines: List[Coroutine[Any, Any, Any]],
maintain_order: bool = False,
) -> Optional[List[Optional[Dict[str, Any] | str | bytes | Any]]]:
if maintain_order:
results = []
for coroutine in coroutines:
results.append(await coroutine)
await asyncio.sleep(0.0)
return results
else:
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(coroutine) for coroutine in coroutines]
except* TimeoutError as err:
self.log.exception(
"While Executing Multiple Tasks A Timeout Exception Has Occured, Cancelling Remaining Tasks, Expetion Details: %s",
err,
)
except* Exception as exc:
self.log.exception(
"While Executing Multiple Tasks Encountered An Exception, Cancelling Remaining Tasks, Expetion Details: %s",
exc,
)
else:
return [task.result() for task in tasks]
def _run_multiple_tasks(
self,
coroutines: List[Coroutine[Any, Any, Any]],
maintain_order: bool = False,
) -> Optional[List[Optional[Dict[str, Any] | str | bytes | Any]]]:
_timeout = self.__timeout * len(coroutines)
future = asyncio.run_coroutine_threadsafe(
self.__run_multiple_tasks(coroutines, maintain_order=maintain_order),
self.__loop,
)
try:
result = future.result(timeout=_timeout)
except TimeoutError:
self.log.error(
"The Request Took Longer Than The Default Timeout To Wait For The Response Along With Retries, i.e. %.2f Seconds, Cancelling The Task...",
_timeout,
)
future.cancel()
except Exception as exc:
self.log.exception(f"The Request Ended Up With An Exception: {exc!r}")
else:
return result
async def __request(
self,
url: str,
method: HTTPMethod,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
verify: bool = True,
ssl: Optional[SSLContext] = None,
timeouts: Optional[Timeouts] = None,
follow: bool = False,
http2: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
limit = asyncio.Semaphore(2) if limit is None else limit
url = FetchData.ROUTES[url] if url in FetchData.ROUTES else url
retry_no, status_code, response_text = 0, None, None
headers = self.__headers
async with limit:
while retry_no < self.max_retries:
if retry_no > 0:
self.log.debug("Retry No. %d", retry_no)
self.log.debug(
"Initializing Request For Endpoint: %s, Method: %s, With Request Params: %s, Headers: %s, Data: %s, Json: %s",
url,
method,
params,
headers,
data,
json,
)
try:
response = await self.client._request_with_body(
url,
method.value,
data=data,
headers=headers,
json=json,
params=params,
multipart=multipart,
verify=verify,
ssl=ssl,
timeouts=timeouts,
follow=follow,
http2=http2,
)
response_text = await response.text()
status_code = response.status_code
except Exception as err:
self.log.error(
"Request For Endpoint: %s, Method: %s, With Request Params: %s, Data: %s, Json: %s; Has FAILED with Status Code: %s and Response Text: %s...Truncted To 500 Chars, Exception was: %s",
url,
method,
params,
data,
json,
status_code,
response_text[:500] if response_text is not None else "",
err,
)
self.log.debug("Going to sleep for 1 second and then retry...")
await asyncio.sleep(1.0)
retry_no += 1
else:
if response.ok:
self.log.info(
"Request For Endpoint: %s, Method: %s, With Request Params: %s, Data: %s, Json: %s; Has SUCCEDED with Status Code: %s and Response Text: %s...Truncted To 500 Chars.",
url,
method,
params,
data,
json,
status_code,
response_text[:500],
)
try:
return await response.json()
except Exception:
return response_text
else:
self.log.error(
"Request For Endpoint: %s, Method: %s, With Request Params: %s, Data: %s, Json: %s; Has FAILED with Status Code: %s and Response Text: %s...Truncted To 500 Chars",
url,
method,
params,
data,
json,
status_code,
response_text[:500] if response_text is not None else "",
)
self.log.debug("Going to sleep for 1 second and then retry...")
await asyncio.sleep(1.0)
retry_no += 1
def _request(
self,
url: str,
method: HTTPMethod,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
verify: bool = True,
ssl: Optional[SSLContext] = None,
timeouts: Optional[Timeouts] = None,
follow: bool = False,
http2: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
future = asyncio.run_coroutine_threadsafe(
self.__request(
url,
method,
data=data,
json=json,
params=params,
multipart=multipart,
verify=verify,
ssl=ssl,
timeouts=timeouts,
follow=follow,
http2=http2,
sclient=sclient,
limit=limit,
),
self.__loop,
)
try:
result = future.result(timeout=self.__timeout)
except TimeoutError:
self.log.error(
"The Request Took Longer Than The Default Timeout To Wait For The Response Along With Retries, i.e. %.2f Seconds, Cancelling The Task...",
self.__timeout,
)
future.cancel()
except Exception as exc:
self.log.exception(f"The Request Ended Up With An Exception: {exc!r}")
else:
return result
async def __get(
self,
url: str,
params: Optional[ParamsType] = None,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return await self.__request(
url,
HTTPMethod.GET,
params=params,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
def _get(
self,
url: str,
params: Optional[ParamsType] = None,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return self._request(
url,
HTTPMethod.GET,
params=params,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
async def __put(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return await self.__request(
url,
HTTPMethod.PUT,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
def _put(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return self._request(
url,
HTTPMethod.PUT,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
async def __post(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return await self.__request(
url,
HTTPMethod.POST,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
def _post(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return self._request(
url,
HTTPMethod.POST,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
async def __patch(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return await self.__request(
url,
HTTPMethod.PATCH,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
def _patch(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return self._request(
url,
HTTPMethod.PATCH,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
async def __delete(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return await self.__request(
url,
HTTPMethod.DELETE,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
def _delete(
self,
url: str,
data: Optional[DataType] = None,
json: Optional[Dict[str, Any] | List[Tuple[str, Any]]] = None,
params: Optional[ParamsType] = None,
multipart: bool = False,
sclient: bool = False,
limit: Optional[asyncio.Semaphore] = None,
) -> Optional[Dict[str, Any] | str | bytes | Any]:
return self._request(
url,
HTTPMethod.DELETE,
params=params,
data=data,
multipart=multipart,
follow=self.__follow,
http2=self.__use_http2,
sclient=sclient,
limit=limit,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment