Last active
April 11, 2022 08:28
-
-
Save RicterZ/40d79feb4173b47c869401890eeab89f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from urllib3.connectionpool import * | |
from urllib3.connectionpool import _Default, _encode_target | |
from requests.adapters import HTTPAdapter | |
class FixedHTTPConnectionPool(HTTPConnectionPool): | |
_url = '' | |
def __init__(self, *args, **kwargs): | |
self._url = kwargs.pop('_url') | |
super(FixedHTTPConnectionPool, self).__init__(*args, **kwargs) | |
def urlopen(self, method, url, body=None, headers=None, retries=None, redirect=True, | |
assert_same_host=True, timeout=_Default, pool_timeout=None, release_conn=None, | |
chunked=False, body_pos=None, **response_kw): | |
parsed_url = parse_url(url) | |
destination_scheme = parsed_url.scheme | |
if headers is None: | |
headers = self.headers | |
if not isinstance(retries, Retry): | |
retries = Retry.from_int(retries, redirect=redirect, default=self.retries) | |
if release_conn is None: | |
release_conn = response_kw.get("preload_content", True) | |
# Check host | |
if assert_same_host and not self.is_same_host(url): | |
raise HostChangedError(self, url, retries) | |
# Ensure that the URL we're connecting to is properly encoded | |
url = self._url | |
conn = None | |
release_this_conn = release_conn | |
http_tunnel_required = connection_requires_http_tunnel( | |
self.proxy, self.proxy_config, destination_scheme | |
) | |
if not http_tunnel_required: | |
headers = headers.copy() | |
headers.update(self.proxy_headers) | |
err = None | |
clean_exit = False | |
body_pos = set_file_position(body, body_pos) | |
try: | |
timeout_obj = self._get_timeout(timeout) | |
conn = self._get_conn(timeout=pool_timeout) | |
conn.timeout = timeout_obj.connect_timeout | |
is_new_proxy_conn = self.proxy is not None and not getattr( | |
conn, "sock", None | |
) | |
if is_new_proxy_conn and http_tunnel_required: | |
self._prepare_proxy(conn) | |
httplib_response = self._make_request( | |
conn, | |
method, | |
url, | |
timeout=timeout_obj, | |
body=body, | |
headers=headers, | |
chunked=chunked, | |
) | |
response_conn = conn if not release_conn else None | |
response_kw["request_method"] = method | |
response = self.ResponseCls.from_httplib( | |
httplib_response, | |
pool=self, | |
connection=response_conn, | |
retries=retries, | |
**response_kw | |
) | |
# Everything went great! | |
clean_exit = True | |
except EmptyPoolError: | |
# Didn't get a connection from the pool, no need to clean up | |
clean_exit = True | |
release_this_conn = False | |
raise | |
except ( | |
TimeoutError, | |
HTTPException, | |
SocketError, | |
ProtocolError, | |
BaseSSLError, | |
SSLError, | |
CertificateError, | |
) as e: | |
clean_exit = False | |
if isinstance(e, (BaseSSLError, CertificateError)): | |
e = SSLError(e) | |
elif isinstance(e, (SocketError, NewConnectionError)) and self.proxy: | |
e = ProxyError("Cannot connect to proxy.", e) | |
elif isinstance(e, (SocketError, HTTPException)): | |
e = ProtocolError("Connection aborted.", e) | |
retries = retries.increment( | |
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2] | |
) | |
retries.sleep() | |
err = e | |
finally: | |
if not clean_exit: | |
conn = conn and conn.close() | |
release_this_conn = True | |
if release_this_conn: | |
self._put_conn(conn) | |
if not conn: | |
log.warning( | |
"Retrying (%r) after connection broken by '%r': %s", retries, err, url | |
) | |
return self.urlopen( | |
method, | |
url, | |
body, | |
headers, | |
retries, | |
redirect, | |
assert_same_host, | |
timeout=timeout, | |
pool_timeout=pool_timeout, | |
release_conn=release_conn, | |
chunked=chunked, | |
body_pos=body_pos, | |
**response_kw | |
) | |
redirect_location = redirect and response.get_redirect_location() | |
if redirect_location: | |
if response.status == 303: | |
method = "GET" | |
try: | |
retries = retries.increment(method, url, response=response, _pool=self) | |
except MaxRetryError: | |
if retries.raise_on_redirect: | |
response.drain_conn() | |
raise | |
return response | |
response.drain_conn() | |
retries.sleep_for_retry(response) | |
log.debug("Redirecting %s -> %s", url, redirect_location) | |
return self.urlopen( | |
method, | |
redirect_location, | |
body, | |
headers, | |
retries=retries, | |
redirect=redirect, | |
assert_same_host=assert_same_host, | |
timeout=timeout, | |
pool_timeout=pool_timeout, | |
release_conn=release_conn, | |
chunked=chunked, | |
body_pos=body_pos, | |
**response_kw | |
) | |
# Check if we should retry the HTTP response. | |
has_retry_after = bool(response.getheader("Retry-After")) | |
if retries.is_retry(method, response.status, has_retry_after): | |
try: | |
retries = retries.increment(method, url, response=response, _pool=self) | |
except MaxRetryError: | |
if retries.raise_on_status: | |
response.drain_conn() | |
raise | |
return response | |
response.drain_conn() | |
retries.sleep(response) | |
log.debug("Retry: %s", url) | |
return self.urlopen( | |
method, | |
url, | |
body, | |
headers, | |
retries=retries, | |
redirect=redirect, | |
assert_same_host=assert_same_host, | |
timeout=timeout, | |
pool_timeout=pool_timeout, | |
release_conn=release_conn, | |
chunked=chunked, | |
body_pos=body_pos, | |
**response_kw | |
) | |
return response | |
class FixedHTTPAdapter(HTTPAdapter): | |
_url = '' | |
def __init__(self, *args, **kwargs): | |
self._url = kwargs.pop('_url') | |
super(FixedHTTPAdapter, self).__init__(*args, **kwargs) | |
def get_connection(self, *args, **kwargs): | |
u = parse_url(url) | |
uri = '{}#{}'.format(u.path, u.fragment) | |
if u.scheme == 'https': | |
raise NotImplementedError | |
else: | |
return FixedHTTPConnectionPool(host=u.host, port=u.port, _url=uri) | |
def request_url(self, *args, **kwargs): | |
return self._url | |
class RequestSess(requests.Session): | |
_url = '' | |
def __init__(self, u): | |
self._url = u | |
super(RequestSess, self).__init__() | |
def get_adapter(self, *args, **kwargs): | |
return FixedHTTPAdapter(_url=self._url) | |
url = 'http://localhost:1234/#/test' | |
sess = RequestSess(url) | |
response = sess.get(url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment