Created
November 8, 2011 05:09
-
-
Save zed/1347055 to your computer and use it in GitHub Desktop.
SSL client/server certificates verification for `urllib2`. :python:ssl:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""SSL client/server certificates verification for `urllib2`. | |
It works on Python 2.6, 2.7, 3.1, 3.2 | |
It also works on Python 2.4, 2.5 if `ssl` is installed (``pip install ssl``) | |
Example:: | |
>>> import urllib2, urllib2_ssl | |
>>> opener = urllib2.build_opener(urllib2_ssl.HTTPSHandler( | |
... key_file='clientkey.pem', | |
... cert_file='clientcert.pem', | |
... ca_certs='cacrt.pem')) | |
>>> opener.open('https://example.com/').read() | |
""" | |
__all__ = ['match_hostname', 'CertificateError'] | |
import sys | |
import socket | |
if not hasattr(socket, 'create_connection'): # for Python 2.4 | |
_GLOBAL_DEFAULT_TIMEOUT = getattr(socket, '_GLOBAL_DEFAULT_TIMEOUT', object()) | |
# copy-paste from stdlib's socket.py (py2.6) | |
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT, | |
source_address=None): | |
"""Connect to *address* and return the socket object. | |
Convenience function. Connect to *address* (a 2-tuple ``(host, | |
port)``) and return the socket object. Passing the optional | |
*timeout* parameter will set the timeout on the socket instance | |
before attempting to connect. If no *timeout* is supplied, the | |
global default timeout setting returned by :func:`getdefaulttimeout` | |
is used. | |
""" | |
host, port = address | |
err = None | |
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): | |
af, socktype, proto, canonname, sa = res | |
sock = None | |
try: | |
sock = socket.socket(af, socktype, proto) | |
if timeout is not _GLOBAL_DEFAULT_TIMEOUT: | |
sock.settimeout(timeout) | |
if source_address: | |
sock.bind(source_address) | |
sock.connect(sa) | |
return sock | |
except socket.error: | |
err = sys.exc_info()[1] | |
if sock is not None: | |
sock.close() | |
if err is not None: | |
raise err | |
else: | |
raise socket.error("getaddrinfo returns an empty list") | |
# monkey-patch socket module | |
socket.create_connection = create_connection | |
# copy-paste from stdlib's ssl.py (py3.2) | |
class CertificateError(ValueError): | |
pass | |
def match_hostname(cert, hostname): | |
"""Verify that *cert* (in decoded format as returned by | |
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules | |
are mostly followed, but IP addresses are not accepted for *hostname*. | |
CertificateError is raised on failure. On success, the function | |
returns nothing. | |
XXX this version differ from ssl.match_hostname in python 3.2 | |
it checks subject even if subjectAltName is not empty | |
""" | |
if not cert: | |
raise ValueError("empty or no certificate") | |
dnsnames = [] | |
san = cert.get('subjectAltName', ()) | |
for key, value in san: | |
if key == 'DNS': | |
if _dnsname_to_pat(value).match(hostname): | |
return | |
dnsnames.append(value) | |
if not dnsnames: | |
#XXX check subject even if subjectAltName is not empty | |
for sub in cert.get('subject', ()): | |
for key, value in sub: | |
# XXX according to RFC 2818, the most specific Common Name | |
# must be used. | |
if key == 'commonName': | |
if _dnsname_to_pat(value).match(hostname): | |
return | |
dnsnames.append(value) | |
if len(dnsnames) > 1: | |
raise CertificateError("hostname %r " | |
"doesn't match either of %s" | |
% (hostname, ', '.join(map(repr, dnsnames)))) | |
elif len(dnsnames) == 1: | |
raise CertificateError("hostname %r " | |
"doesn't match %r" | |
% (hostname, dnsnames[0])) | |
else: | |
raise CertificateError("no appropriate commonName or " | |
"subjectAltName fields were found") | |
def _dnsname_to_pat(dn): | |
pats = [] | |
for frag in dn.split(r'.'): | |
if frag == '*': | |
# When '*' is a fragment by itself, it matches a non-empty dotless | |
# fragment. | |
pats.append('[^.]+') | |
else: | |
# Otherwise, '*' matches any dotless fragment. | |
frag = re.escape(frag) | |
pats.append(frag.replace(r'\*', '[^.]*')) | |
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) | |
try: import ssl | |
except ImportError: | |
ssl = None | |
import warnings | |
msg = ("Can't import ssl. HTTPS won't work." | |
"Run `pip install ssl` if Python < 2.6") | |
# use python -Wd to see this warning (it is ignored by default) | |
try: | |
ImportWarning | |
except NameError: | |
warnings.warn(msg) # Python < 2.5 | |
else: | |
warnings.warn(msg, ImportWarning) | |
else: # ssl is available | |
# see http://www.muchtooscrawled.com/2010/03/https-certificate-verification-in-python-with-urllib2/ | |
try: from http import client # py3k | |
except ImportError: | |
import httplib as client # py < 3.x | |
import re | |
try: import urllib2 as request # py < 3.x | |
except ImportError: | |
from urllib import request # py3k | |
class HTTPSConnection(client.HTTPSConnection): | |
def __init__(self, host, **kwargs): | |
self.ca_certs = kwargs.pop('ca_certs', None) | |
self.checker = kwargs.pop('checker', match_hostname) | |
# for python < 2.6 | |
self.timeout = kwargs.get('timeout', socket.getdefaulttimeout()) | |
client.HTTPSConnection.__init__(self, host, **kwargs) | |
def connect(self): | |
# overrides the version in httplib so that we do | |
# certificate verification | |
args = [(self.host, self.port), self.timeout,] | |
if hasattr(self, 'source_address'): | |
args.append(self.source_address) | |
sock = socket.create_connection(*args) | |
if getattr(self, '_tunnel_host', None): | |
self.sock = sock | |
self._tunnel() | |
# wrap the socket using verification with the root | |
# certs in self.ca_certs | |
kwargs = {} | |
if self.ca_certs is not None: | |
kwargs.update( | |
cert_reqs=ssl.CERT_REQUIRED, | |
ca_certs=self.ca_certs) | |
self.sock = ssl.wrap_socket(sock, | |
keyfile=self.key_file, | |
certfile=self.cert_file, | |
**kwargs) | |
if self.checker is not None: | |
try: | |
self.checker(self.sock.getpeercert(), self.host) | |
except CertificateError: | |
self.sock.shutdown(socket.SHUT_RDWR) | |
self.sock.close() | |
raise | |
# wraps https connections with ssl certificate verification | |
class HTTPSHandler(request.HTTPSHandler): | |
# see http://www.threepillarglobal.com/soap_client_auth | |
# HTTPS Client Auth solution for urllib2, inspired by | |
# http://bugs.python.org/issue3466 and improved by David | |
# Norton of Three Pillar Software. In this implementation, | |
# we use properties passed in rather than static module | |
# fields. | |
def __init__(self, key_file=None, cert_file=None, ca_certs=None, | |
checker=match_hostname): | |
request.HTTPSHandler.__init__(self) | |
# see http://docs.python.org/library/ssl.html#certificates | |
self.key_file = key_file | |
self.cert_file = cert_file | |
self.ca_certs = ca_certs | |
self.checker = checker | |
def https_open(self, req): | |
# Rather than pass in a reference to a connection class, we pass in | |
# a reference to a function which, for all intents and purposes, | |
# will behave as a constructor | |
return self.do_open(self.getConnection, req) | |
def getConnection(self, host, **kwargs): | |
d = dict(cert_file=self.cert_file, | |
key_file=self.key_file, | |
ca_certs=self.ca_certs, | |
checker=self.checker) | |
d.update(kwargs) | |
return HTTPSConnection(host, **d) | |
__all__.append('HTTPSHandler') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Does anyone have any samples of this backport working for SSLv3 TLS1.2?