Skip to content

Instantly share code, notes, and snippets.

@tav
Created April 15, 2009 18:51
Show Gist options
  • Save tav/95955 to your computer and use it in GitHub Desktop.
Save tav/95955 to your computer and use it in GitHub Desktop.
"""
Utility functions for OpenSSL.
"""
import sys
from OpenSSL import crypto
from OpenSSL import SSL
from os import listdir, getcwd
from os.path import join as join_path, isabs, isfile
from thread import allocate_lock
# ------------------------------------------------------------------------------
# some konstants
# ------------------------------------------------------------------------------
RSA = crypto.TYPE_RSA
DSA = crypto.TYPE_DSA
YEAR = 60 * 60 * 24 * 365
BITSIZE = 1024
CERT_ATTRIBUTES = ['C', 'ST', 'L', 'O', 'OU', 'CN']
_signing_authority = None
_unrevoked_signatures = None
_serial = None
_serial_lock = allocate_lock()
_serial_file = None
# ------------------------------------------------------------------------------
# base funktions
# ------------------------------------------------------------------------------
def create_key_pair(algorithm=RSA, bitsize=BITSIZE):
"""Create a public/private key pair."""
key = crypto.PKey()
key.generate_key(algorithm, bitsize)
return key
def create_cert_request(key, digest='sha1', **properties):
"""
Create a certificate request from the given ``key``.
``properties`` can have the following keys:
C Country name
ST State or province name
L Locality name
O Organization name
OU Organizational unit name
CN Common name
emailAddress E-mail address
"""
req = crypto.X509Req()
sub = req.get_subject()
for (name,value) in properties.iteritems():
if name in CERT_ATTRIBUTES:
setattr(sub, name, value)
req.set_pubkey(key)
req.sign(key, digest)
return req
def create_cert_from_cert_request(
request_cert, issuer_cert, issuer_private_key, not_before=0, not_after=YEAR,
serial=None, digest='sha1'
):
"""Create a certificate given a certificate request."""
cert = crypto.X509()
if serial is not None:
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(not_before)
cert.gmtime_adj_notAfter(not_after)
cert.set_subject(request_cert.get_subject())
cert.set_pubkey(request_cert.get_pubkey())
cert.set_issuer(issuer_cert.get_subject())
cert.sign(issuer_private_key, digest)
return cert
# ------------------------------------------------------------------------------
# utility funktions
# ------------------------------------------------------------------------------
def create_cert(
duration=YEAR, bitsize=BITSIZE, issuer=None, serial=None, **properties
):
"""Create a signed certificate."""
key = create_key_pair(bitsize=bitsize)
req = create_cert_request(key, **properties)
# self signing
if issuer is None:
issuer = (req, key)
cert = create_cert_from_cert_request(
req, issuer[0], issuer[1], 0, duration, serial
)
return cert, key
def create_cert_issuer(
bitsize=BITSIZE, name='Certificate Authority', **properties
):
"""Create a self-signed certificate for a CA."""
if properties:
properties.setdefault('CN', name)
else:
properties = {'CN': name}
return create_cert(YEAR*5, bitsize, serial=0, **properties)
from commands import getoutput
def verify_cert(cert, issuer_cert):
"""Verify that the ``cert`` has been signed by the ``issuer_cert``."""
val = getoutput('openssl verify -CAfile %s %s' % (issuer_cert, cert))
if val.endswith('OK'):
return True
return False
def verify_cert(cert, issuer_cert):
"""Verify that the ``cert`` has been signed by the ``issuer_cert``."""
return cert.verify(issuer_cert.get_pubkey())
# ------------------------------------------------------------------------------
# dump/load
# ------------------------------------------------------------------------------
def dump_key(key, stream=sys.stdout):
stream.write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
)
def dump_cert(cert, stream=sys.stdout):
stream.write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
)
def load_key(data):
return crypto.load_privatekey(crypto.FILETYPE_PEM, data)
def load_cert(data):
return crypto.load_certificate(crypto.FILETYPE_PEM, data)
def load_key_from_file(filename):
stream = file(filename, 'r')
data = stream.read()
stream.close()
return crypto.load_privatekey(crypto.FILETYPE_PEM, data)
def load_cert_from_file(filename):
stream = file(filename, 'r')
data = stream.read()
stream.close()
return crypto.load_certificate(crypto.FILETYPE_PEM, data)
# ------------------------------------------------------------------------------
# from fs
# ------------------------------------------------------------------------------
def setup_cert_issuer(
location=None, cert_path='ca.cert', pkey_path='ca.pkey',
serial_path='serial.db', unrevoked_path='issued.db'
):
global _serial
if _serial is not None:
return
# temp
if location is None:
location = getcwd()
if not isabs(pkey_path):
pkey_path = join_path(location, pkey_path)
if not isabs(cert_path):
cert_path = join_path(location, cert_path)
if not isabs(serial_path):
serial_path = join_path(location, serial_path)
if not isabs(unrevoked_path):
unrevoked_path = join_path(location, unrevoked_path)
global _serial_file, _signing_authority
_signing_authority = (
load_cert_from_file(cert_path),
load_key_from_file(pkey_path)
)
if not isfile(serial_path):
_serial_file = file(serial_path, 'w')
_serial = 0
else:
_serial_file = file(serial_path, 'r+w')
try:
_serial = int(_serial_file.read())
except ValueError:
_serial = 0
global _unrevoked_signatures
import atexit
import shelve
_unrevoked_signatures = shelve.open(unrevoked_path)
atexit.register(_unrevoked_signatures.close)
def create_fs_cert(duration=YEAR, bitsize=BITSIZE, **properties):
global _serial
if _serial is None:
raise RuntimeError(
"You need to initialise the system via 'setup_cert_issuer()'."
)
_serial_lock.acquire()
try:
_serial += 1
_serial_file.seek(0)
_serial_file.write(str(_serial))
_serial_file.flush()
finally:
_serial_lock.release()
cert, key = create_cert(
duration, bitsize, _signing_authority, _serial, **properties
)
key_name = properties.get('CN', str(_serial))
_unrevoked_signatures[key_name] = cert.digest('sha1')
return cert, key
# ------------------------------------------------------------------------------
# test
# ------------------------------------------------------------------------------
def test():
ca = create_cert_issuer(
1024,
CN='Espra Certificate Authority',
C='GB',
O='ESP Metanational LLP',
OU='Espra',
emailAddress='[email protected]'
)
c, k = ca
dump_cert(c, file('espra.cert', 'w'))
dump_key(k, file('espra.pkey', 'w'))
c1 = load_cert_from_file('espra.cert')
k1 = load_key_from_file('espra.pkey')
setup_cert_issuer(cert_path='espra.cert', pkey_path='espra.pkey')
c, k = create_fs_cert()
dump_cert(c, file('test.cert', 'w'))
c = load_cert_from_file('test.cert')
print c.get_issuer()
print "Verifying Cert", verify_cert(c, c1)
def test2():
test_cert = load_cert_from_file('test.cert')
ca_cert = load_cert_from_file('espra.cert')
ca_key = load_key_from_file('espra.pkey')
another_key = create_key_pair()
print test_cert.verify(ca_key)
print test_cert.verify(another_key)
print verify_cert(test_cert, ca_cert)
test()
test2()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment