Created
May 6, 2016 17:38
-
-
Save hoov/c633db9609b27c4a44dc4069673c70bb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import logging | |
import ssl | |
import select | |
import urllib3.contrib.pyopenssl | |
from OpenSSL import SSL | |
from ndg.httpsclient.https import HTTPSConnection | |
from ndg.httpsclient.ssl_peer_verification import ServerSSLCertVerification | |
from urllib3.connection import HTTPSConnection as HTTPSConnection_ | |
from urllib3.contrib.pyopenssl import WrappedSocket, _openssl_verify, _verify_callback, DEFAULT_SSL_CIPHER_LIST | |
logger = logging.getLogger(__name__) if __name__ is not "__main__" else logging.getLogger(__file__) | |
# This was shamefully lifted from urllib3.connection, but we need it to do slightly different things. | |
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=ssl.CERT_NONE, | |
ca_certs=None, server_hostname=None, | |
ssl_version=None, ca_cert_dir=None): | |
ctx = SSL.Context(ssl_version) | |
if certfile: | |
keyfile = keyfile or certfile # Match behaviour of the normal python ssl library | |
ctx.use_certificate_file(certfile) | |
if keyfile: | |
ctx.use_privatekey_file(keyfile) | |
if cert_reqs != ssl.CERT_NONE: | |
ctx.set_verify(_openssl_verify[cert_reqs], _verify_callback) | |
if ca_certs or ca_cert_dir: | |
try: | |
ctx.load_verify_locations(ca_certs, ca_cert_dir) | |
except SSL.Error as e: | |
raise ssl.SSLError('bad ca_certs: %r' % ca_certs, e) | |
else: | |
ctx.set_default_verify_paths() | |
# Disable TLS compression to mitigate CRIME attack (issue #309) | |
OP_NO_COMPRESSION = 0x20000 | |
ctx.set_options(OP_NO_COMPRESSION) | |
# Set list of supported ciphersuites. | |
ctx.set_cipher_list(DEFAULT_SSL_CIPHER_LIST) | |
cnx = SSL.Connection(ctx, sock) | |
if server_hostname: | |
server_hostname = server_hostname.encode('utf-8') | |
cnx.set_tlsext_host_name(server_hostname) | |
cnx.set_connect_state() | |
while True: | |
try: | |
cnx.do_handshake() | |
except SSL.WantReadError: | |
rd, _, _ = select.select([sock], [], [], sock.gettimeout()) | |
if not rd: | |
raise timeout('select timed out') | |
continue | |
except SSL.Error as e: | |
raise ssl.SSLError('bad handshake: %r' % e) | |
break | |
return WrappedSocket(cnx, sock) | |
class OurHTTPSConnection(HTTPSConnection_): | |
def connect(self): | |
conn = self._new_conn() | |
self._prepare_conn(conn) | |
self.sock = ssl_wrap_socket(conn, self.key_file, self.cert_file, cert_reqs=ssl.CERT_NONE, | |
ssl_version=SSL.TLSv1_2_METHOD) | |
def make_ssl_request_urllib3(timeout=None, verify=False): | |
HOSTNAME = "thoover.dev.insightsquared.com" | |
logger.info("Making SSL request with timeout %r", timeout) | |
conn = OurHTTPSConnection(HOSTNAME, port=9443) | |
conn.connect() | |
logger.info("Socket class is %r", conn.sock.socket.__module__) | |
try: | |
conn.request("GET", "/login/") | |
resp = conn.getresponse() | |
logger.info("Got a %d %s back", resp.status, resp.reason) | |
resp.read() | |
finally: | |
conn.close() | |
def gevent_main(): | |
from gevent import monkey | |
monkey.patch_all() | |
main() | |
def main(): | |
logger.info("Trying urllib3") | |
make_ssl_request_urllib3(2000) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
subparser = parser.add_subparsers(dest="mode") | |
subparser.add_parser("gevent") | |
subparser.add_parser("normal") | |
args = parser.parse_args() | |
logging.basicConfig(level=logging.DEBUG) | |
logging.captureWarnings(True) | |
if args.mode == "normal": | |
main() | |
elif args.mode == "gevent": | |
gevent_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment