Skip to content

Instantly share code, notes, and snippets.

@duttonw
Created December 14, 2018 05:15
Show Gist options
  • Save duttonw/0e289de885d926d1f344dff92cc9e499 to your computer and use it in GitHub Desktop.
Save duttonw/0e289de885d926d1f344dff92cc9e499 to your computer and use it in GitHub Desktop.
lambda python module
import boto3
import json
import logging
import os
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Secrets Manager Rotation Template
This is a template for creating an AWS Secrets Manager rotation lambda
Args:
event (dict): Lambda dictionary of event parameters. These keys must include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
context (LambdaContext): The Lambda runtime information
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not properly configured for rotation
KeyError: If the event parameters do not contain the expected keys
"""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Setup the client
service_client = boto3.client('secretsmanager', endpoint_url=os.environ['SECRETS_MANAGER_ENDPOINT'])
# Make sure the version is staged correctly
metadata = service_client.describe_secret(SecretId=arn)
if not metadata['RotationEnabled']:
logger.error("Secret %s is not enabled for rotation" % arn)
raise ValueError("Secret %s is not enabled for rotation" % arn)
versions = metadata['VersionIdsToStages']
if token not in versions:
logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
if "AWSCURRENT" in versions[token]:
logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
return
elif "AWSPENDING" not in versions[token]:
logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
if step == "createSecret":
create_secret(service_client, arn, token)
elif step == "setSecret":
set_secret(service_client, arn, token)
elif step == "testSecret":
test_secret(service_client, arn, token)
elif step == "finishSecret":
finish_secret(service_client, arn, token)
else:
raise ValueError("Invalid step parameter")
def create_secret(service_client, arn, token):
"""Create the secret
This method first checks for the existence of a secret for the passed in token. If one does not exist, it will generate a
new secret and put it with the passed in token.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
"""
# Make sure the current secret exists
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
# Now try to get the secret version, if that fails, put a new secret
try:
get_secret_dict(service_client, arn, "AWSPENDING")
logger.info("createSecret: Successfully retrieved secret for %s." % arn)
except service_client.exceptions.ResourceNotFoundException:
iam_user = current_dict['smtpUser']
iam = boto3.client('iam')
# Remove the oldest key if there are multiple
logger.debug("Retrieving keys for user %s" % iam_user)
keys = iam.list_access_keys(UserName=iam_user)['AccessKeyMetadata']
logger.debug("Found %s key(s)" % len(keys))
if len(keys) > 1:
oldest_key = keys[0]
for key in keys:
if key['CreateDate'] < oldest_key['CreateDate']:
logger.debug("%s is older than %s" % (key, oldest_key))
oldest_key = key
key_id_to_delete = oldest_key['AccessKeyId']
key_date_to_delete = oldest_key['CreateDate']
iam.delete_access_key(UserName=iam_user, AccessKeyId=key_id_to_delete)
logger.info("Deleted old key %s from %s" % (key_id_to_delete, key_date_to_delete))
# Generate a new access key
logger.info("createSecret: Creating new access key for %s." % iam_user)
newKey = iam.create_access_key(UserName=iam_user)['AccessKey']
current_dict['smtpAccessKeyId'] = newKey['AccessKeyId']
current_dict['smtpSecretKey'] = newKey['SecretAccessKey']
current_dict['smtpUsername'] = newKey['AccessKeyId']
current_dict['smtpPassword'] = generate_ses_password(newKey['SecretAccessKey'])
# Put the secret
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING'])
logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token))
def generate_ses_password(secret_key, charset='utf-8'):
import hmac #required to compute the HMAC key
import hashlib #required to create a SHA256 hash
import base64 #required to encode the computed key
# These variables are used when calculating the SMTP password. You shouldn't
# change them.
message = 'SendRawEmail'
version = '\x02'
# Compute an HMAC-SHA256 key from the AWS secret access key.
signatureInBytes = hmac.new(secret_key.encode(charset),message.encode(charset),hashlib.sha256).digest()
# Prepend the version number to the signature.
signatureAndVersion = version.encode(charset) + signatureInBytes
# Base64-encode the string that contains the version number and signature.
smtpPassword = base64.b64encode(signatureAndVersion)
# Decode the string and return it
return smtpPassword.decode(charset)
def set_secret(service_client, arn, token):
"""Set the secret
This method should set the AWSPENDING secret in the service that the secret belongs to. For example, if the secret is a database
credential, this method should take the value of the AWSPENDING secret and set the user's password to this value in the database.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
"""
# This is where the secret should be set in the service
# NB This is not applicable for access keys, since they are created and set in one operation
def test_secret(service_client, arn, token):
"""Test the secret
This method should validate that the AWSPENDING secret works in the service that the secret belongs to.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
"""
# This is where the secret should be tested against the service
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING")
iam_user = pending_dict['smtpUser']
logger.info("Checking that the pending key is valid for user %s" % iam_user)
iam = boto3.client('iam')
logger.debug("Retrieving keys for user %s" % iam_user)
keys = iam.list_access_keys(UserName=iam_user)['AccessKeyMetadata']
logger.debug("Found %s key(s)" % len(keys))
pending_key = pending_dict['smtpAccessKeyId']
for key in keys:
if key["AccessKeyId"] == pending_key:
logger.info("Pending access key %s was found on the user" % pending_key)
return
raise ValueError("Unable to find pending access key on user %s" % iam_user)
def finish_secret(service_client, arn, token):
"""Finish the secret
This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn does not exist
"""
# First describe the secret to get the current version
metadata = service_client.describe_secret(SecretId=arn)
current_version = None
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
if version == token:
# The correct version is already marked as current, return
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
publish_update(service_client, arn)
return
current_version = version
break
# Finalize by staging the secret version current
service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (version, arn))
publish_update(service_client, arn)
def publish_update(service_client, arn):
logger.info("Notifying consumers that the secret has updated")
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
topic_arn = current_dict['snsTopic']
if not topic_arn:
logger.warn("No topic ARN provided; unable to notify")
return
sns = boto3.client('sns')
result = sns.publish(TopicArn=topic_arn, Message='Updated %s' % arn)
logger.info("Notification sent as message %s" % result['MessageId'])
def get_secret_dict(service_client, arn, stage, token=None):
"""Gets the secret dictionary corresponding for the secret arn, stage, and token
This helper function gets credentials for the arn and stage passed in and returns the dictionary by parsing the JSON string
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version, or None if no validation is desired
stage (string): The stage identifying the secret version
Returns:
SecretDictionary: Secret dictionary
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON
"""
required_fields = ['smtpUser', 'smtpAccessKeyId', 'smtpSecretKey']
# Only do VersionId validation against the stage if a token is passed in
if token:
secret = service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage)
else:
secret = service_client.get_secret_value(SecretId=arn, VersionStage=stage)
plaintext = secret['SecretString']
secret_dict = json.loads(plaintext)
# Run validations against the secret
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
# Parse and return the secret JSON string
return secret_dict
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment