Last active
April 24, 2025 05:27
-
-
Save JohnPreston/6c6dcd0e726219a10111182891f8547b to your computer and use it in GitHub Desktop.
Generates, encrypt and store password in DynamoDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import uuid | |
import httplib | |
import urlparse | |
import json | |
import boto3 | |
import string | |
import random | |
def encrypt_password_to_b64(key_id=None, password=None): | |
""" | |
Function to encrypt the password with KMS | |
:param: key_id: KMS Key Id | |
:param: password to encrypt | |
:return: String password in base64 | |
""" | |
client = boto3.client('kms') | |
encrypted = client.encrypt( | |
KeyId=key_id, | |
Plaintext=password | |
) | |
encrypted_b64 = base64.b64encode(encrypted['CiphertextBlob']) | |
return encrypted_b64 | |
def put_password_in_dynamodb(env, stack_name, table_name, password): | |
""" | |
Function to store the b64 password in DynamoDB | |
:param env: String for the environment name (usually dev / prod etc.) | |
:param stack_name: Name of the cloudformation stack | |
:param password: string of the base64 encrypted password | |
""" | |
client = boto3.resource('dynamodb') | |
table = client.Table(table_name) | |
response = table.put_item( | |
Item={ | |
'env': env, | |
'stackname': stack_name, | |
'passwordbase64': password | |
} | |
) | |
def generate_random_password(password_length=20): | |
""" | |
Generates a random password sent back to CF | |
:param password_length: Length of the password in number of characters | |
:return: String of the password | |
""" | |
password = '' | |
char_set = string.ascii_uppercase + string.ascii_lowercase + string.digits + '-' | |
while '-' not in password: | |
password = ''.join(random.sample(char_set * 6, int(password_length))) | |
return password | |
def send_response(request, response, status=None, reason=None): | |
""" | |
Send our response to the pre-signed URL supplied by CloudFormation | |
If no ResponseURL is found in the request, there is no place to send a | |
response. This may be the case if the supplied event was for testing. | |
:return: response object | |
""" | |
if status is not None: | |
response['Status'] = status | |
if reason is not None: | |
response['Reason'] = reason | |
if 'ResponseURL' in request and request['ResponseURL']: | |
try: | |
url = urlparse.urlparse(request['ResponseURL']) | |
body = json.dumps(response) | |
https = httplib.HTTPSConnection(url.hostname) | |
https.request('PUT', url.path + '?' + url.query, body) | |
except: | |
print("Failed to send the response to the provdided URL") | |
return response | |
def lambda_handler(event, context): | |
""" | |
Core function called by Lambda | |
The function will determine what to do when called. | |
:param event: Lambda event data | |
:param context: Lambda defined context params | |
:return: Calls for send_response when the code could be executed without problem | |
""" | |
response = { | |
'StackId': event['StackId'], | |
'RequestId': event['RequestId'], | |
'LogicalResourceId': event['LogicalResourceId'], | |
'Status': 'SUCCESS' | |
} | |
if 'PhysicalResourceId' in event: | |
response['PhysicalResourceId'] = event['PhysicalResourceId'] | |
else: | |
response['PhysicalResourceId'] = str(uuid.uuid4()) | |
# There is nothing to do for a delete request | |
if event['RequestType'] == 'Delete': | |
return send_response(event, response) | |
if event['RequestType'] == 'Update': | |
return send_response(event, response) | |
for key in ['KeyId', 'PasswordLength']: | |
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]: | |
return send_response(event, | |
response, | |
status='FAILED', | |
reason='The properties KeyId and PasswordLength must not be empty' | |
) | |
db_password = generate_random_password(event['ResourceProperties']['PasswordLength']) | |
encrypted_password = encrypt_password_to_b64(event['ResourceProperties']['KeyId'], db_password) | |
put_password_in_dynamodb(event['ResourceProperties']['Env'], | |
event['ResourceProperties']['StackName'], | |
event['ResourceProperties']['TableName'], | |
encrypted_password | |
) | |
response['Reason'] = 'The value was successfully encrypted' | |
return send_response(event, response) |
I refactored for Python 3 because httplib
is for Python 2:
import base64
import uuid
import http.client
import urllib.parse
import json
import boto3
import string
import random
def encrypt_password_to_b64(key_id=None, password=None):
"""
Function to encrypt the password with KMS
:param: key_id: KMS Key Id
:param: password to encrypt
:return: String password in base64
"""
client = boto3.client('kms')
encrypted = client.encrypt(KeyId=key_id, Plaintext=password)
encrypted_b64 = base64.b64encode(encrypted['CiphertextBlob'])
return encrypted_b64
def put_password_in_dynamodb(env, stack_name, table_name, password):
"""
Function to store the b64 password in DynamoDB
:param env: String for the environment name (usually dev / prod etc.)
:param stack_name: Name of the cloudformation stack
:param password: string of the base64 encrypted password
"""
client = boto3.resource('dynamodb')
table = client.Table(table_name)
response = table.put_item(Item={
'env': env,
'stackname': stack_name,
'passwordbase64': password
})
def generate_random_password(password_length=20):
"""
Generates a random password sent back to CF
:param password_length: Length of the password in number of characters
:return: String of the password
"""
password = ''
char_set = string.ascii_uppercase + string.ascii_lowercase + string.digits + '-'
while '-' not in password:
password = ''.join(random.sample(char_set * 6, int(password_length)))
return password
def send_response(request, response, status=None, reason=None):
"""
Send our response to the pre-signed URL supplied by CloudFormation
If no ResponseURL is found in the request, there is no place to send a
response. This may be the case if the supplied event was for testing.
:return: response object
"""
if status is not None:
response['Status'] = status
if reason is not None:
response['Reason'] = reason
if 'ResponseURL' in request and request['ResponseURL']:
try:
url = urllib.parse.urlparse(request['ResponseURL'])
body = json.dumps(response)
https = http.client.HTTPSConnection(url.hostname)
https.request('PUT', url.path + '?' + url.query, body)
except:
print("Failed to send the response to the provided URL")
return response
def lambda_handler(event, context):
"""
Core function called by Lambda
The function will determine what to do when called.
:param event: Lambda event data
:param context: Lambda defined context params
:return: Calls for send_response when the code could be executed without problem
"""
response = {
'StackId': event['StackId'],
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'Status': 'SUCCESS'
}
if 'PhysicalResourceId' in event:
response['PhysicalResourceId'] = event['PhysicalResourceId']
else:
response['PhysicalResourceId'] = str(uuid.uuid4())
# There is nothing to do for a delete request
if event['RequestType'] == 'Delete':
return send_response(event, response)
if event['RequestType'] == 'Update':
return send_response(event, response)
for key in ['KeyId', 'PasswordLength']:
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]:
return send_response(
event,
response,
status='FAILED',
reason='The properties KeyId and PasswordLength must not be empty')
db_password = generate_random_password(event['ResourceProperties']['PasswordLength'])
encrypted_password = encrypt_password_to_b64(
event['ResourceProperties']['KeyId'], db_password)
put_password_in_dynamodb(event['ResourceProperties']['Env'],
event['ResourceProperties']['StackName'],
event['ResourceProperties']['TableName'],
encrypted_password)
response['Reason'] = 'The value was successfully encrypted'
return send_response(event, response)
Wow, you think you haven't used something in years and yet!
Thanks for the refactor, happy to update the original.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thanks, really helpful