Created
April 15, 2009 18:51
-
-
Save tav/95955 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Utility functions for OpenSSL. | |
""" | |
import sys | |
from OpenSSL import crypto | |
from OpenSSL import SSL | |
from os import listdir, getcwd | |
from os.path import join as join_path, isabs, isfile | |
from thread import allocate_lock | |
# ------------------------------------------------------------------------------ | |
# some konstants | |
# ------------------------------------------------------------------------------ | |
RSA = crypto.TYPE_RSA | |
DSA = crypto.TYPE_DSA | |
YEAR = 60 * 60 * 24 * 365 | |
BITSIZE = 1024 | |
CERT_ATTRIBUTES = ['C', 'ST', 'L', 'O', 'OU', 'CN'] | |
_signing_authority = None | |
_unrevoked_signatures = None | |
_serial = None | |
_serial_lock = allocate_lock() | |
_serial_file = None | |
# ------------------------------------------------------------------------------ | |
# base funktions | |
# ------------------------------------------------------------------------------ | |
def create_key_pair(algorithm=RSA, bitsize=BITSIZE): | |
"""Create a public/private key pair.""" | |
key = crypto.PKey() | |
key.generate_key(algorithm, bitsize) | |
return key | |
def create_cert_request(key, digest='sha1', **properties): | |
""" | |
Create a certificate request from the given ``key``. | |
``properties`` can have the following keys: | |
C Country name | |
ST State or province name | |
L Locality name | |
O Organization name | |
OU Organizational unit name | |
CN Common name | |
emailAddress E-mail address | |
""" | |
req = crypto.X509Req() | |
sub = req.get_subject() | |
for (name,value) in properties.iteritems(): | |
if name in CERT_ATTRIBUTES: | |
setattr(sub, name, value) | |
req.set_pubkey(key) | |
req.sign(key, digest) | |
return req | |
def create_cert_from_cert_request( | |
request_cert, issuer_cert, issuer_private_key, not_before=0, not_after=YEAR, | |
serial=None, digest='sha1' | |
): | |
"""Create a certificate given a certificate request.""" | |
cert = crypto.X509() | |
if serial is not None: | |
cert.set_serial_number(serial) | |
cert.gmtime_adj_notBefore(not_before) | |
cert.gmtime_adj_notAfter(not_after) | |
cert.set_subject(request_cert.get_subject()) | |
cert.set_pubkey(request_cert.get_pubkey()) | |
cert.set_issuer(issuer_cert.get_subject()) | |
cert.sign(issuer_private_key, digest) | |
return cert | |
# ------------------------------------------------------------------------------ | |
# utility funktions | |
# ------------------------------------------------------------------------------ | |
def create_cert( | |
duration=YEAR, bitsize=BITSIZE, issuer=None, serial=None, **properties | |
): | |
"""Create a signed certificate.""" | |
key = create_key_pair(bitsize=bitsize) | |
req = create_cert_request(key, **properties) | |
# self signing | |
if issuer is None: | |
issuer = (req, key) | |
cert = create_cert_from_cert_request( | |
req, issuer[0], issuer[1], 0, duration, serial | |
) | |
return cert, key | |
def create_cert_issuer( | |
bitsize=BITSIZE, name='Certificate Authority', **properties | |
): | |
"""Create a self-signed certificate for a CA.""" | |
if properties: | |
properties.setdefault('CN', name) | |
else: | |
properties = {'CN': name} | |
return create_cert(YEAR*5, bitsize, serial=0, **properties) | |
from commands import getoutput | |
def verify_cert(cert, issuer_cert): | |
"""Verify that the ``cert`` has been signed by the ``issuer_cert``.""" | |
val = getoutput('openssl verify -CAfile %s %s' % (issuer_cert, cert)) | |
if val.endswith('OK'): | |
return True | |
return False | |
def verify_cert(cert, issuer_cert): | |
"""Verify that the ``cert`` has been signed by the ``issuer_cert``.""" | |
return cert.verify(issuer_cert.get_pubkey()) | |
# ------------------------------------------------------------------------------ | |
# dump/load | |
# ------------------------------------------------------------------------------ | |
def dump_key(key, stream=sys.stdout): | |
stream.write( | |
crypto.dump_privatekey(crypto.FILETYPE_PEM, key) | |
) | |
def dump_cert(cert, stream=sys.stdout): | |
stream.write( | |
crypto.dump_certificate(crypto.FILETYPE_PEM, cert) | |
) | |
def load_key(data): | |
return crypto.load_privatekey(crypto.FILETYPE_PEM, data) | |
def load_cert(data): | |
return crypto.load_certificate(crypto.FILETYPE_PEM, data) | |
def load_key_from_file(filename): | |
stream = file(filename, 'r') | |
data = stream.read() | |
stream.close() | |
return crypto.load_privatekey(crypto.FILETYPE_PEM, data) | |
def load_cert_from_file(filename): | |
stream = file(filename, 'r') | |
data = stream.read() | |
stream.close() | |
return crypto.load_certificate(crypto.FILETYPE_PEM, data) | |
# ------------------------------------------------------------------------------ | |
# from fs | |
# ------------------------------------------------------------------------------ | |
def setup_cert_issuer( | |
location=None, cert_path='ca.cert', pkey_path='ca.pkey', | |
serial_path='serial.db', unrevoked_path='issued.db' | |
): | |
global _serial | |
if _serial is not None: | |
return | |
# temp | |
if location is None: | |
location = getcwd() | |
if not isabs(pkey_path): | |
pkey_path = join_path(location, pkey_path) | |
if not isabs(cert_path): | |
cert_path = join_path(location, cert_path) | |
if not isabs(serial_path): | |
serial_path = join_path(location, serial_path) | |
if not isabs(unrevoked_path): | |
unrevoked_path = join_path(location, unrevoked_path) | |
global _serial_file, _signing_authority | |
_signing_authority = ( | |
load_cert_from_file(cert_path), | |
load_key_from_file(pkey_path) | |
) | |
if not isfile(serial_path): | |
_serial_file = file(serial_path, 'w') | |
_serial = 0 | |
else: | |
_serial_file = file(serial_path, 'r+w') | |
try: | |
_serial = int(_serial_file.read()) | |
except ValueError: | |
_serial = 0 | |
global _unrevoked_signatures | |
import atexit | |
import shelve | |
_unrevoked_signatures = shelve.open(unrevoked_path) | |
atexit.register(_unrevoked_signatures.close) | |
def create_fs_cert(duration=YEAR, bitsize=BITSIZE, **properties): | |
global _serial | |
if _serial is None: | |
raise RuntimeError( | |
"You need to initialise the system via 'setup_cert_issuer()'." | |
) | |
_serial_lock.acquire() | |
try: | |
_serial += 1 | |
_serial_file.seek(0) | |
_serial_file.write(str(_serial)) | |
_serial_file.flush() | |
finally: | |
_serial_lock.release() | |
cert, key = create_cert( | |
duration, bitsize, _signing_authority, _serial, **properties | |
) | |
key_name = properties.get('CN', str(_serial)) | |
_unrevoked_signatures[key_name] = cert.digest('sha1') | |
return cert, key | |
# ------------------------------------------------------------------------------ | |
# test | |
# ------------------------------------------------------------------------------ | |
def test(): | |
ca = create_cert_issuer( | |
1024, | |
CN='Espra Certificate Authority', | |
C='GB', | |
O='ESP Metanational LLP', | |
OU='Espra', | |
emailAddress='[email protected]' | |
) | |
c, k = ca | |
dump_cert(c, file('espra.cert', 'w')) | |
dump_key(k, file('espra.pkey', 'w')) | |
c1 = load_cert_from_file('espra.cert') | |
k1 = load_key_from_file('espra.pkey') | |
setup_cert_issuer(cert_path='espra.cert', pkey_path='espra.pkey') | |
c, k = create_fs_cert() | |
dump_cert(c, file('test.cert', 'w')) | |
c = load_cert_from_file('test.cert') | |
print c.get_issuer() | |
print "Verifying Cert", verify_cert(c, c1) | |
def test2(): | |
test_cert = load_cert_from_file('test.cert') | |
ca_cert = load_cert_from_file('espra.cert') | |
ca_key = load_key_from_file('espra.pkey') | |
another_key = create_key_pair() | |
print test_cert.verify(ca_key) | |
print test_cert.verify(another_key) | |
print verify_cert(test_cert, ca_cert) | |
test() | |
test2() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment