Last active
September 21, 2020 17:42
-
-
Save tyrells/4667515e4d54ab1c01a48aa323a30ce5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import os.path | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import serialization | |
import hashlib | |
import random | |
from datetime import timedelta | |
from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec, ed25519, ed448 | |
# Mozilla CA certificate list in PEM format | |
# downloaded from https://curl.haxx.se/docs/caextract.html | |
CA_CERTS = "cacert.pem" | |
# Boolean that instructs the application to include CA certificates | |
INCLUDE_CA_CERTS = True | |
# Output file paths | |
CA_CERT_FNAME = "ca.pem" | |
SERVER_CERT_FNAME = "server.pem" | |
SERVER_KEY_FNAME = "server.key" | |
# used internally | |
FINGERPRINT_ALGORITHM = hashlib.sha1 | |
def is_binary_file(fname): | |
"""Check if a file contains non-ascii characters""" | |
# define range of ascii characters | |
textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f}) | |
with open(fname, 'rb') as f: | |
bytes = f.read(1024) | |
return bool(bytes.translate(None, textchars)) | |
def process_der_cert(fname): | |
"""Process files containing single certificates in binary DER format""" | |
with open(fname, 'rb') as f: | |
cert_bytes = f.read() | |
cert = x509.load_der_x509_certificate(cert_bytes, default_backend()) | |
yield cert | |
def process_pem_cert(fname): | |
"""Process files containing multiple certificates in ascii PEM format""" | |
with open(fname, 'r') as f: | |
cert_bytes = bytearray() | |
for line in f: | |
if "END CERTIFICATE" in line: | |
cert_bytes.extend(line.encode()) | |
cert = x509.load_pem_x509_certificate(cert_bytes, default_backend()) | |
cert_bytes = bytearray() | |
yield cert | |
else: | |
cert_bytes.extend(line.encode()) | |
def process_certificates(fname): | |
"""Process either PEM or DER formatted certificate files""" | |
ext = os.path.splitext(fname)[-1].lower() | |
try: | |
if ext == '.pem' or not is_binary_file(fname): | |
for c in process_pem_cert(fname): | |
yield(c) | |
print("INFO: Processing file: {}".format(fname), file=sys.stderr) | |
elif ext == '.der' or is_binary_file(fname): | |
for c in process_der_cert(fname): | |
yield(c) | |
print("INFO: Processing file: {}".format(fname), file=sys.stderr) | |
except ValueError as e: | |
# ValueError execptions are related to files that cannot | |
# be processed. Ignore these files and continue. | |
if "Unable to load certificate" in str(e): | |
print("WARNING: Wrong format, ignoring file: {}".format(fname), file=sys.stderr) | |
return | |
raise e | |
def get_cert_issuer(cert, cert_store, ca_store): | |
"""Find the issuer of a certificate, and return if the certificate was | |
self-signed and if it was found in the trusted CA store""" | |
is_self_signed = False | |
is_ca_signed = False | |
issuer_cert = None | |
if cert.issuer == cert.subject: | |
is_self_signed = True | |
issuer_cert = cert | |
for c in cert_store.values(): | |
if cert.issuer == c.subject: | |
issuer_cert = c | |
for c in ca_store.values(): | |
if cert.issuer == c.subject: | |
issuer_cert = c | |
is_ca_signed = True | |
return (is_self_signed, is_ca_signed, issuer_cert) | |
def generate_key_from_cert(cert): | |
"""Generate new private/public key pair using the same parameters | |
included in the certificate""" | |
public_key = cert.public_key() | |
if isinstance(public_key, rsa.RSAPublicKey): | |
key = rsa.generate_private_key( | |
public_exponent=public_key.public_numbers().e, | |
key_size=public_key.key_size, | |
backend=default_backend() | |
) | |
elif isinstance(public_key, dsa.DSAPublicKey): | |
key = dsa.generate_private_key( | |
key_size=public_key.key_size, | |
backend=default_backend() | |
) | |
elif isinstance(public_key, ec.EllipticCurvePublicKey): | |
key = ec.generate_private_key( | |
curve=public_key.curve, | |
backend=default_backend() | |
) | |
elif isinstance(public_key, ed25519.Ed25519PublicKey): | |
key = ed25519.Ed25519PrivateKey.generate() | |
elif isinstance(public_key, ed448.Ed448PublicKey): | |
key = ed448.Ed448PrivateKey.generate() | |
return key | |
def generate_key_from_signer(cert): | |
"""Generate new private/public key pair using the same parameters | |
known about the certififcate issuer, guess what we don't know.""" | |
# if certificate is self-signed, generate key from certificate | |
# directly, without trying to guess issuer parameters | |
if cert.issuer == cert.subject: | |
return generate_key_from_cert(cert) | |
# FIX: imports a private dictionary from library to do lookup | |
# from signature algorithm to hashing and encrypt functions | |
from cryptography.x509.oid import _SIG_OIDS_TO_HASH, _OID_NAMES | |
signature_hash = _SIG_OIDS_TO_HASH[cert.signature_algorithm_oid] | |
signature_algorithm_name = _OID_NAMES[cert.signature_algorithm_oid] | |
if signature_algorithm_name.endswith("RSAEncryption"): | |
# exponent chosen according to best practices | |
# key length based on actual results | |
key = rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=len(cert.signature) * 8, | |
backend=default_backend() | |
) | |
elif signature_algorithm_name.startswith("RSASSA"): | |
# use RSA instead with default parameters | |
key = rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=2048, | |
backend=default_backend() | |
) | |
elif signature_algorithm_name.startswith("ecdsa-"): | |
# heuristic algorithm, because couldn't find any info on | |
# signature lengths | |
if len(cert.signature) < 90: | |
curve = ec.SECP256R1 | |
else: | |
curve = ec.SECP384R1 | |
key = ec.generate_private_key( | |
curve=curve, | |
backend=default_backend() | |
) | |
elif signature_algorithm_name.startswith("dsa-"): | |
# chosen based on best practice, as couldn't find any examples | |
# because DSA is not used in practice | |
key = dsa.generate_private_key( | |
key_size=2048, | |
backend=default_backend() | |
) | |
return "DSA" | |
elif signature_algorithm_name == "ed25519": | |
key = ed25519.Ed25519PrivateKey.generate() | |
elif signature_algorithm_name == "ed448": | |
key = ed448.Ed448PrivateKey.generate() | |
return key | |
def clone_certificate(cert, key_store): | |
"""Clone an existing certificate, and generate a new private/public | |
key pair for the new certificate""" | |
key = generate_key_from_cert(cert) | |
fingerprint = FINGERPRINT_ALGORITHM(cert.subject.rfc4514_string().encode()).hexdigest() | |
key_store[fingerprint] = key | |
new_cert_builder = ( | |
x509.CertificateBuilder() | |
.issuer_name(cert.issuer) | |
.subject_name(cert.subject) | |
.public_key(key.public_key()) | |
.serial_number(cert.serial_number) | |
.not_valid_before(cert.not_valid_before) | |
.not_valid_after(cert.not_valid_after) | |
) | |
## Fix: add some filtering here later for 'safe' extensions and enable again | |
#for ext in cert.extensions: | |
# new_cert_builder.add_extension(ext.value, ext.critical) | |
issuer_fingerprint = FINGERPRINT_ALGORITHM(cert.issuer.rfc4514_string().encode()).hexdigest() | |
return key, new_cert_builder.sign(key_store[issuer_fingerprint], cert.signature_hash_algorithm, default_backend()) | |
def generate_issuer(cert): | |
"""Generate a fake issuer certificate cloning as many of the issuers | |
details that are known""" | |
key = generate_key_from_signer(cert) | |
serial_number = random.getrandbits(64) | |
try: | |
ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) | |
# FIX: we're just taking the first name returned, may want to change this to generate multiple certificates | |
issuer_name = ext.value.authority_cert_issuer[0].value if ext.value.authority_cert_issuer is not None else cert.issuer | |
serial_number = ext.value.authority_cert_serial_number if ext.value.authority_cert_serial_number is not None else random.getrandbits(64) | |
except x509.ExtensionNotFound: | |
# no AuthorityKeyIdentifier extension present | |
pass | |
issuer_cert = ( | |
x509.CertificateBuilder() | |
.issuer_name(issuer_name) | |
.subject_name(cert.issuer) | |
.public_key(key.public_key()) | |
.serial_number(serial_number) | |
.not_valid_before(cert.not_valid_before - timedelta(days=5*365)) | |
.not_valid_after(cert.not_valid_after + timedelta(days=5*365)) | |
) | |
try: | |
ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) | |
key_identifier = ext.value.key_identifier | |
ski = x509.SubjectKeyIdentifier(key_identifier) | |
issuer_cert.add_extension(ski, False) | |
except x509.ExtensionNotFound: | |
# no AuthorityKeyIdentifier extension present, not including key identifier | |
pass | |
return key, issuer_cert.sign(key, cert.signature_hash_algorithm, default_backend()) | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print("Usage: {} [input file/dir]".format(sys.argv[0]), file=sys.stderr) | |
sys.exit(1) | |
# allow processing multiple files/directories | |
in_files = [] | |
for arg in sys.argv[1:]: | |
# process single file | |
if os.path.isfile(arg): | |
in_files.append(arg) | |
# recursively process directory | |
elif os.path.isdir(arg): | |
for subdir, dirs, files in os.walk(arg): | |
for f in files: | |
fpath = os.path.join(subdir, f) | |
in_files.append(fpath) | |
else: | |
print("ERROR: {} does not exist".format(arg), file=sys.stderr) | |
sys.exit(2) | |
ca_store = {} | |
# process trusted root CA certificates | |
if INCLUDE_CA_CERTS: | |
print("INFO: Loading CA ceritificates", file=sys.stderr) | |
for c in process_certificates(CA_CERTS): | |
fingerprint = FINGERPRINT_ALGORITHM(c.subject.rfc4514_string().encode()).hexdigest() | |
if fingerprint in ca_store: | |
print("WARNING: Duplicate CA ceritificate detected: {}".format(c.subject), file=sys.stderr) | |
else: | |
ca_store[fingerprint] = c | |
cert_store = {} | |
print("INFO: Loading ceritificates", file=sys.stderr) | |
for fname in in_files: | |
for c in process_certificates(fname): | |
fingerprint = FINGERPRINT_ALGORITHM(c.subject.rfc4514_string().encode()).hexdigest() | |
if fingerprint in cert_store: | |
print("WARNING: Duplicate ceritificate detected: {}".format(c.subject), file=sys.stderr) | |
else: | |
cert_store[fingerprint] = c | |
print("\nLoaded certificates:\nHash\t\t\t\t\t\tName\n----------------------------------------------------------------------------", file=sys.stderr) | |
for k, v in cert_store.items(): | |
print("{}\t{}".format(k, v.subject.rfc4514_string()), file=sys.stderr) | |
print("----------------------------------------------------------------------------", file=sys.stderr) | |
cert_choice = input('\nEnter your certificate choice [hash]: ') | |
output_cert_chain = [] | |
output_cert_chain.append(cert_store[cert_choice]) | |
chain_complete = False | |
new_cert_added = True | |
while not chain_complete and new_cert_added: | |
c = output_cert_chain[-1] | |
is_self_signed, is_ca_signed, issuer_cert = get_cert_issuer(c, cert_store, ca_store) | |
if is_self_signed: | |
print("INFO: Self-Signed Certificate Chain created successfully", file=sys.stderr) | |
break | |
elif is_ca_signed: | |
output_cert_chain.append(issuer_cert) | |
print("INFO: Trusted Certificate Chain created successfully (might be able to register with same CA to bypass authentication)", file=sys.stderr) | |
break | |
elif issuer_cert is not None: | |
output_cert_chain.append(issuer_cert) | |
else: | |
print("WARNING: Generating missing certificate: {}".format(c.issuer.rfc4514_string()), file=sys.stderr) | |
_, new_cert = generate_issuer(c) | |
output_cert_chain.append(new_cert) | |
key_store = {} | |
old_ca_cert = output_cert_chain.pop() | |
print("\nINFO: Cloning CA certificate: {}".format(old_ca_cert.subject.rfc4514_string()), file=sys.stderr) | |
ca_key, new_ca_cert = clone_certificate(old_ca_cert, key_store) | |
print("INFO: Saving CA certificate to {}\n".format(CA_CERT_FNAME), file=sys.stderr) | |
with open(CA_CERT_FNAME, "wb") as f: | |
f.write(new_ca_cert.public_bytes(encoding=serialization.Encoding.PEM)) | |
server_keys = [] | |
server_certs = [] | |
for cert in reversed(output_cert_chain): | |
print("INFO: Cloning server certificate chain: {}".format(cert.subject.rfc4514_string()), file=sys.stderr) | |
new_key, new_cert = clone_certificate(cert, key_store) | |
server_certs.append(new_cert) | |
server_keys.append(new_key) | |
print("INFO: Saving server certificate chain to {}".format(SERVER_CERT_FNAME), file=sys.stderr) | |
with open(SERVER_CERT_FNAME, "wb") as f: | |
for cert in reversed(server_certs): | |
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) | |
print("INFO: Saving server private key to {}".format(SERVER_KEY_FNAME), file=sys.stderr) | |
with open(SERVER_KEY_FNAME, "wb") as f: | |
f.write(server_keys[-1].private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.TraditionalOpenSSL, | |
encryption_algorithm=serialization.NoEncryption() | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment