Created
December 14, 2018 05:15
-
-
Save duttonw/0e289de885d926d1f344dff92cc9e499 to your computer and use it in GitHub Desktop.
lambda python module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import logging | |
import os | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def lambda_handler(event, context): | |
"""Secrets Manager Rotation Template | |
This is a template for creating an AWS Secrets Manager rotation lambda | |
Args: | |
event (dict): Lambda dictionary of event parameters. These keys must include the following: | |
- SecretId: The secret ARN or identifier | |
- ClientRequestToken: The ClientRequestToken of the secret version | |
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret) | |
context (LambdaContext): The Lambda runtime information | |
Raises: | |
ResourceNotFoundException: If the secret with the specified arn and stage does not exist | |
ValueError: If the secret is not properly configured for rotation | |
KeyError: If the event parameters do not contain the expected keys | |
""" | |
arn = event['SecretId'] | |
token = event['ClientRequestToken'] | |
step = event['Step'] | |
# Setup the client | |
service_client = boto3.client('secretsmanager', endpoint_url=os.environ['SECRETS_MANAGER_ENDPOINT']) | |
# Make sure the version is staged correctly | |
metadata = service_client.describe_secret(SecretId=arn) | |
if not metadata['RotationEnabled']: | |
logger.error("Secret %s is not enabled for rotation" % arn) | |
raise ValueError("Secret %s is not enabled for rotation" % arn) | |
versions = metadata['VersionIdsToStages'] | |
if token not in versions: | |
logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn)) | |
raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn)) | |
if "AWSCURRENT" in versions[token]: | |
logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)) | |
return | |
elif "AWSPENDING" not in versions[token]: | |
logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) | |
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) | |
if step == "createSecret": | |
create_secret(service_client, arn, token) | |
elif step == "setSecret": | |
set_secret(service_client, arn, token) | |
elif step == "testSecret": | |
test_secret(service_client, arn, token) | |
elif step == "finishSecret": | |
finish_secret(service_client, arn, token) | |
else: | |
raise ValueError("Invalid step parameter") | |
def create_secret(service_client, arn, token): | |
"""Create the secret | |
This method first checks for the existence of a secret for the passed in token. If one does not exist, it will generate a | |
new secret and put it with the passed in token. | |
Args: | |
service_client (client): The secrets manager service client | |
arn (string): The secret ARN or other identifier | |
token (string): The ClientRequestToken associated with the secret version | |
Raises: | |
ResourceNotFoundException: If the secret with the specified arn and stage does not exist | |
""" | |
# Make sure the current secret exists | |
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT") | |
# Now try to get the secret version, if that fails, put a new secret | |
try: | |
get_secret_dict(service_client, arn, "AWSPENDING") | |
logger.info("createSecret: Successfully retrieved secret for %s." % arn) | |
except service_client.exceptions.ResourceNotFoundException: | |
iam_user = current_dict['smtpUser'] | |
iam = boto3.client('iam') | |
# Remove the oldest key if there are multiple | |
logger.debug("Retrieving keys for user %s" % iam_user) | |
keys = iam.list_access_keys(UserName=iam_user)['AccessKeyMetadata'] | |
logger.debug("Found %s key(s)" % len(keys)) | |
if len(keys) > 1: | |
oldest_key = keys[0] | |
for key in keys: | |
if key['CreateDate'] < oldest_key['CreateDate']: | |
logger.debug("%s is older than %s" % (key, oldest_key)) | |
oldest_key = key | |
key_id_to_delete = oldest_key['AccessKeyId'] | |
key_date_to_delete = oldest_key['CreateDate'] | |
iam.delete_access_key(UserName=iam_user, AccessKeyId=key_id_to_delete) | |
logger.info("Deleted old key %s from %s" % (key_id_to_delete, key_date_to_delete)) | |
# Generate a new access key | |
logger.info("createSecret: Creating new access key for %s." % iam_user) | |
newKey = iam.create_access_key(UserName=iam_user)['AccessKey'] | |
current_dict['smtpAccessKeyId'] = newKey['AccessKeyId'] | |
current_dict['smtpSecretKey'] = newKey['SecretAccessKey'] | |
current_dict['smtpUsername'] = newKey['AccessKeyId'] | |
current_dict['smtpPassword'] = generate_ses_password(newKey['SecretAccessKey']) | |
# Put the secret | |
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING']) | |
logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token)) | |
def generate_ses_password(secret_key, charset='utf-8'): | |
import hmac #required to compute the HMAC key | |
import hashlib #required to create a SHA256 hash | |
import base64 #required to encode the computed key | |
# These variables are used when calculating the SMTP password. You shouldn't | |
# change them. | |
message = 'SendRawEmail' | |
version = '\x02' | |
# Compute an HMAC-SHA256 key from the AWS secret access key. | |
signatureInBytes = hmac.new(secret_key.encode(charset),message.encode(charset),hashlib.sha256).digest() | |
# Prepend the version number to the signature. | |
signatureAndVersion = version.encode(charset) + signatureInBytes | |
# Base64-encode the string that contains the version number and signature. | |
smtpPassword = base64.b64encode(signatureAndVersion) | |
# Decode the string and return it | |
return smtpPassword.decode(charset) | |
def set_secret(service_client, arn, token): | |
"""Set the secret | |
This method should set the AWSPENDING secret in the service that the secret belongs to. For example, if the secret is a database | |
credential, this method should take the value of the AWSPENDING secret and set the user's password to this value in the database. | |
Args: | |
service_client (client): The secrets manager service client | |
arn (string): The secret ARN or other identifier | |
token (string): The ClientRequestToken associated with the secret version | |
""" | |
# This is where the secret should be set in the service | |
# NB This is not applicable for access keys, since they are created and set in one operation | |
def test_secret(service_client, arn, token): | |
"""Test the secret | |
This method should validate that the AWSPENDING secret works in the service that the secret belongs to. | |
Args: | |
service_client (client): The secrets manager service client | |
arn (string): The secret ARN or other identifier | |
token (string): The ClientRequestToken associated with the secret version | |
""" | |
# This is where the secret should be tested against the service | |
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING") | |
iam_user = pending_dict['smtpUser'] | |
logger.info("Checking that the pending key is valid for user %s" % iam_user) | |
iam = boto3.client('iam') | |
logger.debug("Retrieving keys for user %s" % iam_user) | |
keys = iam.list_access_keys(UserName=iam_user)['AccessKeyMetadata'] | |
logger.debug("Found %s key(s)" % len(keys)) | |
pending_key = pending_dict['smtpAccessKeyId'] | |
for key in keys: | |
if key["AccessKeyId"] == pending_key: | |
logger.info("Pending access key %s was found on the user" % pending_key) | |
return | |
raise ValueError("Unable to find pending access key on user %s" % iam_user) | |
def finish_secret(service_client, arn, token): | |
"""Finish the secret | |
This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret. | |
Args: | |
service_client (client): The secrets manager service client | |
arn (string): The secret ARN or other identifier | |
token (string): The ClientRequestToken associated with the secret version | |
Raises: | |
ResourceNotFoundException: If the secret with the specified arn does not exist | |
""" | |
# First describe the secret to get the current version | |
metadata = service_client.describe_secret(SecretId=arn) | |
current_version = None | |
for version in metadata["VersionIdsToStages"]: | |
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]: | |
if version == token: | |
# The correct version is already marked as current, return | |
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn)) | |
publish_update(service_client, arn) | |
return | |
current_version = version | |
break | |
# Finalize by staging the secret version current | |
service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version) | |
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (version, arn)) | |
publish_update(service_client, arn) | |
def publish_update(service_client, arn): | |
logger.info("Notifying consumers that the secret has updated") | |
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT") | |
topic_arn = current_dict['snsTopic'] | |
if not topic_arn: | |
logger.warn("No topic ARN provided; unable to notify") | |
return | |
sns = boto3.client('sns') | |
result = sns.publish(TopicArn=topic_arn, Message='Updated %s' % arn) | |
logger.info("Notification sent as message %s" % result['MessageId']) | |
def get_secret_dict(service_client, arn, stage, token=None): | |
"""Gets the secret dictionary corresponding for the secret arn, stage, and token | |
This helper function gets credentials for the arn and stage passed in and returns the dictionary by parsing the JSON string | |
Args: | |
service_client (client): The secrets manager service client | |
arn (string): The secret ARN or other identifier | |
token (string): The ClientRequestToken associated with the secret version, or None if no validation is desired | |
stage (string): The stage identifying the secret version | |
Returns: | |
SecretDictionary: Secret dictionary | |
Raises: | |
ResourceNotFoundException: If the secret with the specified arn and stage does not exist | |
ValueError: If the secret is not valid JSON | |
""" | |
required_fields = ['smtpUser', 'smtpAccessKeyId', 'smtpSecretKey'] | |
# Only do VersionId validation against the stage if a token is passed in | |
if token: | |
secret = service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage) | |
else: | |
secret = service_client.get_secret_value(SecretId=arn, VersionStage=stage) | |
plaintext = secret['SecretString'] | |
secret_dict = json.loads(plaintext) | |
# Run validations against the secret | |
for field in required_fields: | |
if field not in secret_dict: | |
raise KeyError("%s key is missing from secret JSON" % field) | |
# Parse and return the secret JSON string | |
return secret_dict |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment