Skip to content

Instantly share code, notes, and snippets.

@chiragjn
Last active April 4, 2020 15:48
Show Gist options
  • Save chiragjn/9a4f7755f6810cdf22b3863ba7f023f0 to your computer and use it in GitHub Desktop.
Save chiragjn/9a4f7755f6810cdf22b3863ba7f023f0 to your computer and use it in GitHub Desktop.
Just some thoughts around a template to handle various requests errors
import abc
import json
from typing import Dict, Any, Optional
import requests
class JsonResponseRequest(abc.ABC):
@abc.abstractmethod
def response_has_errors(self, response_json: Dict[str, Any], response: requests.Response) -> bool:
success = response_json.get('success', True)
error = response_json.get('error', None)
return not success or not error
@abc.abstractmethod
def call(self, *args, **kwargs) -> requests.Response:
raise NotImplementedError
# Network Failures
@abc.abstractmethod
def on_connection_error(self, exc, **kwargs) -> Optional[Any]:
raise NotImplementedError
# No Response Failures
@abc.abstractmethod
def on_timeout_error(self, timeout_exc: requests.exceptions.Timeout, **kwargs) -> Optional[Any]:
raise NotImplementedError
# HTTP Errors - non 200
@abc.abstractmethod
def on_http_error_json_ok(self, http_error_exc: requests.exceptions.HTTPError,
response_json: Dict[str, Any], response: requests.Response,
**kwargs) -> Optional[Any]:
raise NotImplementedError
@abc.abstractmethod
def on_http_error_bad_json(self, http_error_exc: requests.exceptions.HTTPError,
json_exc: json.JSONDecodeError, response: requests.Response,
**kwargs) -> Optional[Any]:
raise NotImplementedError
# General exception
@abc.abstractmethod
def on_other_exceptions(self, exc: Exception, **kwargs) -> Optional[Any]:
raise exc
# HTTP OK scenarios
@abc.abstractmethod
def on_http_ok_bad_json(self, json_exc: json.JSONDecodeError, response: requests.Response,
**kwargs) -> Optional[Any]:
raise NotImplementedError
@abc.abstractmethod
def on_http_ok_response_has_errors(self, response_json: Dict[str, Any], response: requests.Response,
**kwargs) -> Optional[Any]:
raise NotImplementedError
@abc.abstractmethod
def on_http_ok_response_ok(self, response_json: Dict[str, Any], response: requests.Response,
**kwargs) -> Optional[Any]:
raise NotImplementedError
def __call__(self, *args, **kwargs) -> Optional[Any]:
try:
response = self.call(*args, **kwargs)
response.raise_for_status()
try:
response_json = response.json()
if self.response_has_errors(response_json=response_json, response=response):
call_response = self.on_http_ok_response_has_errors(response_json=response_json, response=response)
else:
call_response = self.on_http_ok_response_ok(response_json=response_json, response=response)
except json.JSONDecodeError as je:
call_response = self.on_http_ok_bad_json(json_exc=je, response=response)
except requests.exceptions.ConnectionError as ce:
call_response = self.on_connection_error(exc=ce)
except requests.exceptions.Timeout as te:
call_response = self.on_timeout_error(timeout_exc=te)
except requests.exceptions.HTTPError as he:
try:
# noinspection PyUnboundLocalVariable
response_json = response.json()
call_response = self.on_http_error_json_ok(http_error_exc=he, response_json=response_json, response=response)
except json.JSONDecodeError as je:
call_response = self.on_http_error_bad_json(http_error_exc=he, json_exc=je, response=response)
except Exception as e:
call_response = self.on_other_exceptions(exc=e)
return call_response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment