Last active
May 28, 2024 22:24
-
-
Save fsultan/95df6a7067918b6b68d21546c62f0aa5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import json | |
import logging | |
import os | |
import shutil | |
import boto3 | |
from datetime import timedelta | |
from cryptography import x509 | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.primitives.hashes import SHA256 | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.x509.oid import NameOID | |
from botocore.exceptions import ClientError | |
from botocore.config import Config | |
# from secrets_manager_utils import SecretsManagerUtils | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
client_dict = {} | |
resource_dict = {} | |
current_aws_region = os.environ['AWS_REGION'] | |
config = Config( | |
retries={ | |
'max_attempts': 10, | |
'mode': 'standard' | |
} | |
) | |
def handler(event, context): | |
if event["is_standalone_deployment"] : | |
try: | |
SSL_CERT_GENERATOR_KMS_KEY_ARN = event['ssl_cert_generator_kms_key_arn'] | |
CNAME = event['ssl_cert_cname'] | |
key = generate_rsa(2048) | |
cert = create_cert(key) | |
PRIVATE_KEY_SECRET_NAME = "/application/gemfire/gemfire-cluster-self-signed-private-key" | |
CERTIFICATE_SECRET_NAME = "/application/gemfire/gemfire-cluster-self-signed-certificate" | |
TRUSTED_CERTIFICATE_SECRET_NAME = "/application/gemfire/gemfire-cluster-self-signed-trusted-certificate" | |
SecMgrClient = SecretsManagerUtils(kms_arn=SSL_CERT_GENERATOR_KMS_KEY_ARN) | |
upload_secret(SecMgrClient, key, PRIVATE_KEY_SECRET_NAME, type="private-key") | |
upload_secret(SecMgrClient, cert, CERTIFICATE_SECRET_NAME, type="certificate") | |
upload_secret(SecMgrClient, cert, TRUSTED_CERTIFICATE_SECRET_NAME, type="trusted-certificate") | |
except KeyError as err: | |
logger.error(f"Missing key in event payload: {err}") | |
raise err | |
except IOError as err: | |
logger.error(f"CANNOT ZIP CERTIFICATE FILES DUE TO IO ERROR: {err}") | |
raise err | |
except IOError as err: | |
logger.error(f"CANNOT CREATE CERTIFICATE DUE TO OS ERROR : {err}") | |
raise err | |
except Exception as err: | |
logger.error(f"CERTIFICATE SET UP FAILED DUE TO : {err}") | |
raise err | |
def upload_secret(client, secret, secret_name, type: str): | |
secret_string = "" | |
if type == "private-key": | |
secret_string = secret.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.PKCS8, | |
encryption_algorithm=serialization.NoEncryption() | |
).decode() | |
else: | |
secret_string = secret.public_bytes( | |
encoding=serialization.Encoding.PEM | |
).decode() | |
try: | |
# If there is no secret with the given secret name: Create Secret | |
client.upload_secret | |
secret_name = secret_name, secret_string = secret_string | |
logger.info(f"CREATED {type} SECRET") | |
except ClientError as e: | |
try: | |
response = client.describe_secret( | |
secret_name=secret_name | |
) | |
if 'DeletedDate' in response: | |
# If the secret is already there in the secrets manager and in deleted state: restore and update secret | |
client.restore_secret( | |
secret_name=secret_name | |
) | |
client.put_secret_value( | |
secret_name=secret_name, secret_string=secret_string | |
) | |
logger.info (f"RESTORED AND UPDATED {type} SECRET") | |
else: | |
# if the secret is already there but not in the deleted state: update the secret | |
client.put_secret_value( | |
secret_name=secret_name, secret_string=secret_string | |
) | |
logger.info(f"UPDATED {type} SECRET") | |
except Exception as e: | |
logger.error(f"Could not upload certificate due to {e}") | |
return | |
except Exception as e: | |
logger.error(f"Could not upload certificate due to {e}") | |
def write_cert(cert, key): | |
try: | |
key_path = "/tmp/privateKey.pem" | |
cert_path = "/tmp/certificateChain.pem" | |
trusted_cert = "/tmp/trustedCertificates.pem" | |
with open(cert_path, "wt") as f_cert: | |
f_cert.write(cert.public_bytes( | |
encoding=serialization.Encoding.PEM | |
).decode("utf-8")) | |
with open(key_path, "wt") as f_private_key: | |
f_private_key.write(key.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.PKCS8, | |
encryption_algorithm=serialization.NoEncryption() | |
).decode("utf-8")) | |
shutil.copy(cert_path, trusted_cert) | |
except Exception as e: | |
logger.error(f"COULD NOT WRITE FILE DUE TO : {e}") | |
raise e | |
def create_cert(key): | |
try: | |
builder = x509.CertificateBuilder() | |
builder = builder.subject_name(x509.Name([ | |
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), | |
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"New Jersey"), | |
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Jersey City"), | |
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"JP Morgan Chase"), | |
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"EFP"), | |
x509.NameAttribute(NameOID.COMMON_NAME, CNAME), | |
])) | |
builder = builder.issuer_name(x509.Name([ | |
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), | |
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"New Jersey"), | |
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Jersey City"), | |
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"JP Morgan Chase"), | |
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"EFP"), | |
x509.NameAttribute(NameOID.COMMON_NAME, CNAME), | |
])) | |
builder = builder.not_valid_before(datetime.datetime.utcnow()) | |
builder = builder.not_valid_after(datetime.datetime.utcnow() + timedelta(days=365)) | |
builder = builder.serial_number(x509.random_serial_number()) | |
builder = builder.public_key(key.public_key()) | |
builder = builder.add_extension( | |
x509.SubjectAlternativeName([x509.DNSName(CNAME)]), | |
critical=False, | |
) | |
cert = builder.sign( | |
private_key=key, | |
algorithm=hashes.SHA256(), | |
) | |
return cert | |
except Exception as e: | |
logger.error (f"CERTIFICATE CREATION FAILED DUE TO {e}") | |
raise e | |
def generate_rsa(bits) : | |
try: | |
key = rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=bits, | |
backend=default_backend() | |
) | |
return key | |
except Exception as e: | |
logger.error ("RSA GENERATION ERROR") | |
raise e | |
def get_endpoint_url_dict(aws_region): | |
return { | |
"s3": f"https://s3.{aws_region}.amazonaws.com", | |
"ssm": f"https://ssm.{aws_region}.amazonaws.com", | |
"ec2": f"https://ec2.{aws_region}.amazonaws.com", | |
"emr": f"https://elasticmapreduce.{aws_region}.amazonaws.com", | |
"acm": f"https://acm.{aws_region}.amazonaws.com", | |
"sqs": f"https://sqs.{aws_region}.amazonaws.com", | |
"sns": f"https://sns.{aws_region}.amazonaws.com", | |
"elbv2": f"https://elasticloadbalancing.{aws_region}.amazonaws.com", | |
"lambda": f"https://lambda.{aws_region}.amazonaws.com", | |
"application-autoscaling": f"https://application-autoscaling.{aws_region}.amazonaws.com", | |
"secretsmanager": f"https://secretsmanager.{aws_region}.amazonaws.com" | |
} | |
def get_aws_client(aws_service): | |
if aws_service not in client_dict: | |
client_dict[aws_service] = boto3.client( | |
service_name=aws_service, | |
region_name=current_aws_region, | |
config=config, | |
endpoint_url=get_endpoint_url_dict(current_aws_region).get(aws_service) | |
) | |
return client_dict[aws_service] | |
class SecretsManagerUtils: | |
def __init__(self, kms_arn: str): | |
self.secretsmanager_client = get_aws_client('secretsmanager') | |
self.kms_key_id = kms_arn | |
def upload_secret(self, secret_name: str, secret_string: str): | |
return self.secretsmanager_client.create_secret( | |
Name=secret_name, | |
SecretString=secret_string, | |
KmsKeyId=self.kms_key_id | |
) | |
def restore_secret(self, secret_name: str): | |
return self.secretsmanager_client.restore_secret( | |
SecretId=secret_name | |
) | |
def describe_secret(self, secret_name: str): | |
return self.secretsmanager_client.describe_secret( | |
SecretId=secret_name | |
) | |
def put_secret_value(self, secret_name: str, secret_string: str): | |
return self.secretsmanager_client.put_secret_value( | |
SecretId=secret_name, | |
SecretString=secret_string | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment