-
-
Save cfangmeier/c6e1b61cfd01a71fa55aba1c22d56267 to your computer and use it in GitHub Desktop.
SSL client/server certificates verification for `urllib2`. :python:ssl:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""SSL client/server certificates verification for `urllib2`. | |
see: https://gist.github.com/zed/1347055 | |
It works on Python 2.6, 2.7, 3.1, 3.2 | |
Example:: | |
>>> import urllib2, urllib2_ssl | |
>>> opener = urllib2.build_opener(urllib2_ssl.HTTPSHandler( | |
... key_file='clientkey.pem', | |
... cert_file='clientcert.pem', | |
... ca_certs='cacrt.pem')) | |
>>> opener.open('https://example.com/').read() | |
""" | |
__all__ = ['match_hostname', 'CertificateError'] | |
import re | |
import ssl | |
import socket | |
class CertificateError(ValueError): | |
pass | |
def match_hostname(cert, hostname): | |
"""Verify that *cert* (in decoded format as returned by | |
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules | |
are mostly followed, but IP addresses are not accepted for *hostname*. | |
CertificateError is raised on failure. On success, the function | |
returns nothing. | |
XXX this version differ from ssl.match_hostname in python 3.2 | |
it checks subject even if subjectAltName is not empty | |
""" | |
if not cert: | |
raise ValueError("empty or no certificate") | |
dnsnames = [] | |
san = cert.get('subjectAltName', ()) | |
for key, value in san: | |
if key == 'DNS': | |
if _dnsname_to_pat(value).match(hostname): | |
return | |
dnsnames.append(value) | |
if not dnsnames: | |
# XXX check subject even if subjectAltName is not empty | |
for sub in cert.get('subject', ()): | |
for key, value in sub: | |
# XXX according to RFC 2818, the most specific Common Name | |
# must be used. | |
if key == 'commonName': | |
if _dnsname_to_pat(value).match(hostname): | |
return | |
dnsnames.append(value) | |
if len(dnsnames) > 1: | |
raise CertificateError("hostname %r doesn't match either of %s" | |
% (hostname, ', '.join(map(repr, dnsnames)))) | |
elif len(dnsnames) == 1: | |
raise CertificateError("hostname %r doesn't match %r" | |
% (hostname, dnsnames[0])) | |
else: | |
raise CertificateError("no appropriate commonName or " | |
"subjectAltName fields were found") | |
def _dnsname_to_pat(dn): | |
pats = [] | |
for frag in dn.split(r'.'): | |
if frag == '*': | |
# When '*' is a fragment by itself, it matches a non-empty dotless | |
# fragment. | |
pats.append('[^.]+') | |
else: | |
# Otherwise, '*' matches any dotless fragment. | |
frag = re.escape(frag) | |
pats.append(frag.replace(r'\*', '[^.]*')) | |
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) | |
try: | |
from http import client # py3k | |
except ImportError: | |
import httplib as client # py < 3.x | |
try: | |
import urllib2 as request # py < 3.x | |
except ImportError: | |
from urllib import request # py3k | |
class HTTPSConnection(client.HTTPSConnection): | |
def __init__(self, host, **kwargs): | |
self.ca_certs = kwargs.pop('ca_certs', None) | |
self.checker = kwargs.pop('checker', match_hostname) | |
# for python < 2.6 | |
self.timeout = kwargs.get('timeout', socket.getdefaulttimeout()) | |
client.HTTPSConnection.__init__(self, host, **kwargs) | |
def connect(self): | |
# overrides the version in httplib so that we do | |
# certificate verification | |
args = [(self.host, self.port), self.timeout] | |
if hasattr(self, 'source_address'): | |
args.append(self.source_address) | |
sock = socket.create_connection(*args) | |
if getattr(self, '_tunnel_host', None): | |
self.sock = sock | |
self._tunnel() | |
# wrap the socket using verification with the root | |
# certs in self.ca_certs | |
kwargs = {} | |
if self.ca_certs is not None: | |
kwargs.update( | |
cert_reqs=ssl.CERT_REQUIRED, | |
ca_certs=self.ca_certs) | |
self.sock = ssl.wrap_socket(sock, | |
keyfile=self.key_file, | |
certfile=self.cert_file, | |
**kwargs) | |
if self.checker is not None: | |
try: | |
self.checker(self.sock.getpeercert(), self.host) | |
except CertificateError: | |
self.sock.shutdown(socket.SHUT_RDWR) | |
self.sock.close() | |
raise | |
class HTTPSHandler(request.HTTPSHandler): | |
""" Wraps https connections with ssl certificate verification | |
""" | |
def __init__(self, key_file=None, cert_file=None, ca_certs=None, | |
checker=match_hostname): | |
request.HTTPSHandler.__init__(self) | |
self.key_file = key_file | |
self.cert_file = cert_file | |
self.ca_certs = ca_certs | |
self.checker = checker | |
def https_open(self, req): | |
return self.do_open(self.getConnection, req) | |
def getConnection(self, host, **kwargs): | |
d = dict(cert_file=self.cert_file, | |
key_file=self.key_file, | |
ca_certs=self.ca_certs, | |
checker=self.checker) | |
d.update(kwargs) | |
return HTTPSConnection(host, **d) | |
__all__.append('HTTPSHandler') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment