Last active
February 9, 2024 19:16
-
-
Save arizvisa/6e0c3cb1051587ca366e7a7b321b91ea to your computer and use it in GitHub Desktop.
shirtz
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# gen_key returns privkey as bytes and pem | |
# load_key will read a key from some bytes (decrypting if a pass is provided), asking user for input, returning privkey as bytes and pem | |
# gen_certificate takes your private key as bytes and some keywords, returning a cert and pubkey | |
# load_certificate takes private key and cert as bytes, returning a cert and pubkey | |
# setup_ssl is just example code how to use those things | |
def gen_key(e=65537, bits=1024): | |
import cryptography.hazmat.primitives.asymmetric.rsa as chpar | |
import cryptography.hazmat.primitives.serialization as chps | |
import cryptography.hazmat.backends as chb | |
print("generating an RSA key of {:d}-bit{:s} using e={:d}.".format(bits, '' if bits == 1 else 's', e)) | |
key = chpar.generate_private_key(public_exponent=e, key_size=bits, backend=chb.default_backend()) | |
pem = key.private_bytes(encoding=chps.Encoding.PEM, format=chps.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=chps.NoEncryption()) | |
return key, pem | |
def load_key(content, password=None): | |
import cryptography.hazmat.primitives.serialization as chps | |
import cryptography.hazmat.backends as chb | |
print("loading RSA key from {:d} bytes worth of file.".format(len(content))) | |
try: | |
key = chps.load_pem_private_key(data=content, password=password, backend=chb.default_backend()) | |
except ValueError: | |
print('critical: error while decoding key, generating a temporary one instead.\n') | |
return gen_key() | |
except TypeError: | |
pass | |
else: | |
pem = key.private_bytes(encoding=chps.Encoding.PEM, format=chps.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=chps.NoEncryption()) | |
return key, pem | |
try: | |
password = input("key is encrypted, please type in your password (ctrl+c to give up): ") | |
except KeyboardInterrupt: | |
print("warning: user aborted key decryption, generating a temporary one instead.\n") | |
return gen_key() | |
return load_key(content, password) | |
def gen_certificate(private, **params): | |
from datetime import datetime as delirium_tremens | |
now = delirium_tremens.utcnow() | |
params.setdefault('serial_number', 1024) | |
params.setdefault('not_valid_before', now) | |
params.setdefault('not_valid_after', params['not_valid_before'] + datetime.timedelta(days=42)) | |
params.setdefault('hashAlgorithm', 'sha256') | |
import cryptography.x509 as X | |
import ipaddress as inet | |
hostname = os.environ.get('HOSTNAME', 'localhost') | |
cn = X.Name([X.NameAttribute(X.oid.NameOID.COMMON_NAME, hostname)]) | |
params['subject_name'] = cn | |
params.setdefault('issuer_name', cn) | |
global host, port | |
address = inet.ip_address(host) | |
alts = [token(item) for token, item in zip([X.DNSName, X.IPAddress], [host, address])] | |
an = X.SubjectAlternativeName(alts) | |
bc = X.BasicConstraints(ca=True, path_length=0) | |
import cryptography.hazmat.primitives as chp | |
namespace = map(functools.partial(getattr, chp.hashes), dir(chp.hashes)) | |
algorithm_types = (item for item in namespace if isinstance(item, type)) | |
algorithms = {cons.name : cons for cons in algorithm_types if issubclass(cons, chp.hashes.HashAlgorithm) and cons is not chp.hashes.HashAlgorithm} | |
suggestion = params.pop('hashAlgorithm') | |
if operator.contains(algorithms, suggestion): | |
hashAlgorithm = algorithms[suggestion] | |
else: | |
print("critical: suggested hash algorithm ({:s}) was not found in the available algoritms ({:s}).".format(suggestion, ', '.join(sorted(algorithms)))) | |
hashAlgorithm = algorithms[next(name for name in itertools.chain(['sha1', 'md5'], algorithms) if operator.contains(algorithms, name))] | |
print("warning: ended up falling back to an alternative one ({:s}).\n".format(hashAlgorithm.name)) | |
import cryptography.hazmat.backends as chb | |
import cryptography.hazmat.primitives as chp | |
params['issuer_name'] = X.Name([X.NameAttribute(X.oid.NameOID.COMMON_NAME, params['issuer_name'])]) if isinstance(params['issuer_name'], six.string_types) else params['issuer_name'] | |
print("generating {:s}certificate issued by {:s} for {:s} ({:s}).".format('self-signed ' if params['issuer_name'] == params['subject_name'] else '', params['issuer_name'].rfc4514_string(), params['subject_name'].rfc4514_string(), ', '.join(map("{!s}".format, an)))) | |
try: | |
x509 = functools.reduce(lambda agg, attribute_value: (lambda attribute, value: getattr(agg, attribute)(int(value) if isinstance(value, six.string_types) and value.isdigit() else value))(*attribute_value), params.items(), X.CertificateBuilder()) | |
except AttributeError: | |
available = {attribute for attribute in dir(X.CertificateBuilder) if not attribute.startswith('_')} | {'hashAlgorithm'} | |
misses = {choice for choice in params} - available | |
print("critical: unable to generate certificate due to the explicitly given parameters ({:s}) not being within the ones available ({:s}).".format(', '.join(misses), ', '.join(available))) | |
print('trying again without the invalid parameters.\n') | |
[ params.pop(attribute) for attribute in misses ] | |
params['hashAlgorithm'] = hashAlgorithm.name | |
return gen_certificate(private, **params) | |
else: | |
print('adding necessary extensions to certificate and signing it.') | |
extended = x509.add_extension(bc, False).add_extension(an, False).public_key(private.public_key()) | |
try: | |
certificate = extended.sign(private_key=private, algorithm=hashAlgorithm(), backend=chb.default_backend()) | |
except (ValueError, TypeError): | |
print("critical: error signing certificate likely due to the hashAlgorithm ({:s}) not being viable.".format(hashAlgorithm.name)) | |
print('trying again using a default algorithm.\n') | |
return gen_certificate(private, **params) | |
assert isinstance(certificate, X.Certificate) | |
return certificate, certificate.public_key() | |
def load_certificate(private, content): | |
import cryptography.x509 as X | |
import cryptography.hazmat.primitives.asymmetric.padding as chpap | |
import cryptography.hazmat.primitives.serialization as chps | |
print("reading an X509 certificate from {:d} bytes worth of PEM.".format(len(content))) | |
try: | |
certificate = X.load_pem_x509_certificate(data=content) | |
except ValueError: | |
print("critical: error while decoding certificate, generating one instead.\n") | |
return gen_certificate(private) | |
import cryptography | |
print('verifying the private key matches the following public key from the certificate.\n') | |
print(certificate.public_key().public_bytes(encoding=chps.Encoding.PEM, format=chps.PublicFormat.SubjectPublicKeyInfo).decode(sys.getdefaultencoding())) | |
try: | |
private.public_key().verify(signature=certificate.signature, data=certificate.tbs_certificate_bytes, padding=chpap.PKCS1v15(), algorithm=certificate.signature_hash_algorithm) | |
except cryptography.exceptions.InvalidSignature: | |
print("critical: the certificate's public key does not match the private key, generating a new certificate instead.\n") | |
return gen_certificate(private) | |
else: | |
print('which definitely seems to be the case.') | |
return certificate, certificate.public_key() | |
def hardlink(src, dst): | |
if os.path.isfile(dst): | |
print("warning: removing file at the specified target path ({:s}).".format(dst)) | |
os.unlink(dst) | |
elif os.path.exists(dst): | |
print("critical: refusing to overwrite target path ({:s}) due to target not being a file.".format(dst)) | |
return | |
return os.link(src, dst) | |
def setup_ssl(socket, arguments): | |
try: | |
import cryptography, ssl | |
import cryptography.hazmat.primitives.serialization as chps | |
except ImportError: | |
print('warning: ignoring request for SSL support due to an error importing the necessary libraries.') | |
return socket | |
because_python = {} | |
if arguments.keyfile: | |
with open(arguments.keyfile, 'rb') as infile: | |
content = infile.read() | |
key, pem = load_key(content) | |
else: | |
key, pem = gen_key(bits=arguments.keysize) | |
print("using the following {:d}-bit key.\n".format(key.key_size)) | |
print(pem.decode(sys.getdefaultencoding())) | |
because_python['keydata'] = pem | |
parameters = {attribute : value for attribute, value in arguments.parameters or []} | |
if arguments.certificate: | |
if parameters: | |
print('warning: ignoring the provided certificate parameters due to being asked to load certificate from file.\n') | |
with open(arguments.certificate, 'rb') as infile: | |
content = infile.read() | |
cert, pk = load_certificate(key, content) | |
else: | |
cert, pk = gen_certificate(key, **parameters) | |
sig = bytearray(cert.fingerprint(algorithm=cert.signature_hash_algorithm)) | |
pem = cert.public_bytes(encoding=chps.Encoding.PEM) | |
print("\nusing certificate with a {:d}-bit {:s} ({:s})\n{:s}\n".format(8 * len(sig), cert.signature_hash_algorithm.name, cert.signature_algorithm_oid.dotted_string, ':'.join(map("{:02x}".format, sig)))) | |
print(pem.decode(sys.getdefaultencoding())) | |
because_python['certdata'] = pem | |
import tempfile | |
with tempfile.NamedTemporaryFile(prefix='poc', delete=not WIN32) as keyfile, tempfile.NamedTemporaryFile(prefix='poc', delete=not WIN32) as certfile: | |
keyfile.write(because_python['keydata']) | |
if arguments.keypath: | |
hardlink(keyfile.name, arguments.keypath) | |
print("wrote key data to {:s}.".format(arguments.keypath)) | |
certfile.write(because_python['certdata']) | |
if arguments.certificatepath: | |
hardlink(certfile.name, arguments.certificatepath) | |
print("wrote certificate data to {:s}.".format(arguments.certificatepath)) | |
if WIN32: [ file.close() for file in [keyfile, certfile] ] | |
wrapped_socket = ssl.wrap_socket(socket, server_side=True, keyfile=keyfile.name, certfile=certfile.name) | |
if WIN32: keyfile, certfile = (open(file.name) for file in [keyfile, certfile]) | |
return wrapped_socket |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment