Last active
January 2, 2020 00:03
-
-
Save HacKanCuBa/28e7c08b967422fe04f2d057086216c8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Wrappers around Python 3 Requests library. | |
This lib will log errors, warnings and request duration, not raising any | |
exception: in such error cases, an empty dict is returned. To identify, if | |
necessary, that there where errors, a with_error flag must be set in the | |
arguments so that the methods return a tuple in the form of | |
(response_data: any, error: bool). | |
If there's any response expected from the endpoint, it will be returned | |
JSON-converted as-is, which means it's either valid JSON (string, number, | |
list, dict) or an empty dict (default response value, which is still valid | |
JSON). | |
""" | |
import asyncio | |
import functools | |
import logging | |
import typing | |
from time import time | |
import requests | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
__version__ = '0.7.3' | |
__author__ = 'HacKan (https://hackan.net)' | |
__license__ = 'GPL-3+' | |
__url__ = 'https://gist.github.com/HacKanCuBa/28e7c08b967422fe04f2d057086216c8' | |
logger = logging.getLogger(__name__) | |
TJSONData = typing.Optional[typing.Union[dict, list, int, str]] | |
def retry_connector( | |
*, | |
retries: int = 3, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Sequence[int] = (500, 502, 504), | |
session: typing.Optional[requests.Session] = None, | |
) -> requests.Session: | |
"""Get a connector with retry mechanism incorporated. | |
:Author: Peter Bengtsson (https://www.peterbe.com) | |
:URL: https://www.peterbe.com/plog/best-practice-with-retries-with-requests | |
""" | |
session = session or requests.Session() | |
retry = Retry( | |
total=retries, | |
read=retries, | |
connect=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount('http://', adapter) | |
session.mount('https://', adapter) | |
return session | |
class SimpleResponse(typing.NamedTuple): | |
"""Simple response class.""" | |
response: typing.Optional[requests.Response] | |
exception: bool | |
class SimpleDataResponse(typing.NamedTuple): | |
"""Simple data response class.""" | |
response_data: TJSONData | |
exception: bool | |
response_status: typing.Optional[int] | |
class SimpleRequest: | |
"""Wrapper over requests lib that catches and logs errors and connection times.""" | |
VERIFY_SSL: bool = True | |
@classmethod | |
def request( | |
cls, | |
method: str, | |
url: str, | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleResponse: | |
"""Make a request to an endpoint, return the response if any. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
:param method: The request method as string, such as GET, POST, PUT, etc. | |
:param url: Endpoint URL as string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
exception = False | |
response = None | |
if timeout > 0: | |
kwargs['timeout'] = timeout | |
if 'verify' not in kwargs: | |
kwargs['verify'] = cls.VERIFY_SSL | |
if retries: | |
connector = retry_connector(retries=retries, backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist) | |
else: | |
connector = requests | |
request_time_start = time() | |
try: | |
response = connector.request(method, url, **kwargs) | |
request_time_end = time() | |
except (requests.exceptions.ConnectionError, | |
requests.exceptions.ReadTimeout): | |
request_time_end = time() | |
logger.exception( | |
'Error [%s]ing data (kwargs: %s) to/from the endpoint (url: %s)', | |
method, | |
str(kwargs), | |
url, | |
) | |
exception = True | |
else: | |
if not response.ok: | |
logger.warning( | |
'Response from endpoint %s (kwargs: %s) is NOT OK: %d %s', | |
url, | |
str(kwargs), | |
response.status_code, | |
response.text, | |
) | |
logger.debug( | |
'Request to endpoint %s took %.2f seconds', | |
url, | |
request_time_end - request_time_start, | |
) | |
return SimpleResponse(response, exception) | |
@classmethod | |
async def aiorequest( | |
cls, | |
method: str, | |
url: str, | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleResponse: | |
"""Asynchronously make a request to an endpoint, return the response if any. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
:param method: The request method as string, such as GET, POST, PUT, etc. | |
:param url: Endpoint URL as string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
loop = asyncio.get_running_loop() | |
response, exception = await loop.run_in_executor(None, functools.partial( | |
cls.request, | |
method, | |
url, | |
timeout=timeout, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
)) | |
return SimpleResponse(response, exception) | |
class JSONConnector: | |
"""Generic requests wrapper class to handle JSON endpoints.""" | |
@staticmethod | |
def request( | |
method: str, | |
url: str, | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleDataResponse: | |
"""Make a request to a JSON endpoint, return the JSON converted response if any. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
Note that the type of the returned response depends on the endpoint, | |
but it will always be some valid JSON. | |
To know whether an exception occurred or not check the exception property | |
of the return value. | |
:param method: The request method as string, such as GET, POST, PUT, etc. | |
:param url: Endpoint URL as string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response data (if any), a bool representing | |
the occurrence of an exception and the response HTTP status code. | |
""" | |
response_data = None | |
headers = kwargs.get('headers') or {} | |
headers['content-type'] = 'application/json' | |
kwargs['headers'] = headers | |
response, exception = SimpleRequest.request( | |
method, | |
url, | |
timeout=timeout, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
if exception: | |
status_code = None | |
else: | |
status_code = response.status_code | |
try: | |
response_data = response.json() | |
except ValueError: | |
if response.text: # Could be an empty response | |
logger.warning( | |
'Response from endpoint %s is not valid JSON: %d %s', | |
url, | |
response.status_code, | |
response.text, | |
) | |
return SimpleDataResponse(response_data, exception, status_code) | |
@classmethod | |
def get( | |
cls, | |
url: str, | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleDataResponse: | |
"""Retrieve data from a JSON endpoint, return the JSON converted response if any. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
Note that the type of the returned response depends on the endpoint, | |
but it will always be some valid JSON. | |
To know whether an exception occurred or not check the exception property | |
of the return value. | |
:param url: Endpoint URL as string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response data (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
return cls.request( | |
'GET', | |
url, | |
timeout=timeout, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
@classmethod | |
def post( | |
cls, | |
url: str, | |
data: typing.Union[str, dict, bytes, list, tuple], | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleDataResponse: | |
"""Post data to a JSON endpoint, return the JSON converted response if any. | |
If given data is a string, it will be previously encoded as if it were UTF-8. | |
It is recommended to not send strings but encoded ones as bytes. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
Note that the type of the returned response depends on the endpoint, | |
but it will always be some valid JSON. | |
To know whether an exception occurred or not check the exception property | |
of the return value. | |
:param url: Endpoint URL as string. | |
:param data: Data to post, either as a dictionary (JSON valid) or a string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response data (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
if isinstance(data, (dict, list, tuple)): | |
return cls.request( | |
'POST', | |
url, | |
timeout=timeout, | |
json=data, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
elif isinstance(data, str): | |
data = data.encode() | |
return cls.request( | |
'POST', | |
url, | |
timeout=timeout, | |
data=data, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
class JSONConnectorAsync: | |
"""Generic requests wrapper class to handle JSON endpoints asynchronously.""" | |
@staticmethod | |
async def request( | |
method: str, | |
url: str, | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleDataResponse: | |
"""Make a request to a JSON endpoint, return the JSON converted response if any. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
Note that the type of the returned response depends on the endpoint, | |
but it will always be some valid JSON. | |
To know whether an exception occurred or not check the exception property | |
of the return value. | |
:param method: The request method as string, such as GET, POST, PUT, etc. | |
:param url: Endpoint URL as string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response data (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
response_data = None | |
headers = kwargs.get('headers') or {} | |
headers['content-type'] = 'application/json' | |
kwargs['headers'] = headers | |
response, exception = await SimpleRequest.aiorequest( | |
method, | |
url, | |
timeout=timeout, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
if exception: | |
status_code = None | |
else: | |
status_code = response.status_code | |
try: | |
response_data = response.json() | |
except ValueError: | |
if response.text: # Could be an empty response | |
logger.warning( | |
'Response from endpoint %s is not valid JSON: %d %s', | |
url, | |
response.status_code, | |
response.text, | |
) | |
return SimpleDataResponse(response_data, exception, status_code) | |
@classmethod | |
async def get( | |
cls, | |
url: str, | |
*, | |
timeout: typing.Union[int, float], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
**kwargs, | |
) -> SimpleDataResponse: | |
"""Retrieve data from a JSON endpoint, return the JSON converted response if any. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
Note that the type of the returned response depends on the endpoint, | |
but it will always be some valid JSON. | |
To know whether an exception occurred or not check the exception property | |
of the return value. | |
:param url: Endpoint URL as string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response data (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
return await cls.request( | |
'GET', | |
url, | |
timeout=timeout, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
@classmethod | |
async def post( | |
cls, | |
url: str, | |
data: typing.Union[str, dict, bytes, list, tuple], | |
retries: typing.Optional[int] = None, | |
backoff_factor: float = 0.3, | |
status_forcelist: typing.Optional[typing.Sequence[int]] = None, | |
*, | |
timeout: typing.Union[int, float], | |
**kwargs, | |
) -> SimpleDataResponse: | |
"""Post data to a JSON endpoint, return the JSON converted response if any. | |
If given data is a string, it will be previously encoded as if it were UTF-8. | |
It is recommended to not send strings but encoded ones as bytes. | |
Request time, errors and exceptions are all logged using the standard | |
logger. | |
Note that the type of the returned response depends on the endpoint, | |
but it will always be some valid JSON. | |
To know whether an exception occurred or not check the exception property | |
of the return value. | |
:param url: Endpoint URL as string. | |
:param data: Data to post, either as a dictionary (JSON valid) or a string. | |
:param timeout: Connection timeout in seconds (0 for inf). | |
:param retries: Any number bigger than 0 implies usage of a retry strategy. | |
:param backoff_factor: Factor to apply to wait time for retries. | |
:param status_forcelist: Set of status codes over 500 that you want to | |
forcibly retry. | |
:param kwargs: Additional arguments for `requests`. | |
:return: An object with the response data (if any) and a bool representing | |
the occurrence of an exception. | |
""" | |
if isinstance(data, (dict, list, tuple)): | |
return await cls.request( | |
'POST', | |
url, | |
timeout=timeout, | |
json=data, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) | |
elif isinstance(data, str): | |
data = data.encode() | |
return await cls.request( | |
'POST', | |
url, | |
timeout=timeout, | |
data=data, | |
retries=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
**kwargs, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment