Created
November 14, 2013 19:47
-
-
Save gregneagle/7473150 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httplib | |
import re | |
import socket | |
import sys | |
import urllib2 | |
import ssl | |
class InvalidCertificateException(httplib.HTTPException, urllib2.URLError): | |
def __init__(self, host, cert, reason): | |
httplib.HTTPException.__init__(self) | |
self.host = host | |
self.cert = cert | |
self.reason = reason | |
def __str__(self): | |
return ('Host %s returned an invalid certificate (%s) %s\n' % | |
(self.host, self.reason, self.cert)) | |
class CertValidatingHTTPSConnection(httplib.HTTPConnection): | |
default_port = httplib.HTTPS_PORT | |
def __init__(self, host, port=None, key_file=None, cert_file=None, | |
ca_certs=None, strict=None, **kwargs): | |
httplib.HTTPConnection.__init__(self, host, port, strict, **kwargs) | |
self.key_file = key_file | |
self.cert_file = cert_file | |
self.ca_certs = ca_certs | |
if self.ca_certs: | |
self.cert_reqs = ssl.CERT_REQUIRED | |
else: | |
self.cert_reqs = ssl.CERT_NONE | |
def _GetValidHostsForCert(self, cert): | |
if 'subjectAltName' in cert: | |
return [x[1] for x in cert['subjectAltName'] | |
if x[0].lower() == 'dns'] | |
else: | |
return [x[0][1] for x in cert['subject'] | |
if x[0][0].lower() == 'commonname'] | |
def _ValidateCertificateHostname(self, cert, hostname): | |
hosts = self._GetValidHostsForCert(cert) | |
for host in hosts: | |
host_re = host.replace('.', '\.').replace('*', '[^.]*') | |
if re.search('^%s$' % (host_re,), hostname, re.I): | |
return True | |
return False | |
def connect(self): | |
sock = socket.create_connection((self.host, self.port)) | |
self.sock = ssl.wrap_socket(sock, keyfile=self.key_file, | |
certfile=self.cert_file, | |
cert_reqs=self.cert_reqs, | |
ca_certs=self.ca_certs) | |
if self.cert_reqs & ssl.CERT_REQUIRED: | |
cert = self.sock.getpeercert() | |
hostname = self.host.split(':', 0)[0] | |
if not self._ValidateCertificateHostname(cert, hostname): | |
raise InvalidCertificateException(hostname, cert, | |
'hostname mismatch') | |
class VerifiedHTTPSHandler(urllib2.HTTPSHandler): | |
def __init__(self, **kwargs): | |
urllib2.AbstractHTTPHandler.__init__(self) | |
self._connection_args = kwargs | |
def https_open(self, req): | |
def http_class_wrapper(host, **kwargs): | |
full_kwargs = dict(self._connection_args) | |
full_kwargs.update(kwargs) | |
return CertValidatingHTTPSConnection(host, **full_kwargs) | |
try: | |
return self.do_open(http_class_wrapper, req) | |
except urllib2.URLError, e: | |
if type(e.reason) == ssl.SSLError and e.reason.args[0] == 1: | |
raise InvalidCertificateException(req.host, '', | |
e.reason.args[1]) | |
raise | |
https_request = urllib2.HTTPSHandler.do_request_ | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
print "usage: python %s CA_CERT URL" % sys.argv[0] | |
exit(2) | |
handler = VerifiedHTTPSHandler(ca_certs = sys.argv[1]) | |
opener = urllib2.build_opener(handler) | |
print opener.open(sys.argv[2]).read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment