Skip to content

Instantly share code, notes, and snippets.

@AlainODea
Created October 30, 2019 18:52
Show Gist options
  • Save AlainODea/f4371a28be01453b6908deaa780ea601 to your computer and use it in GitHub Desktop.
Save AlainODea/f4371a28be01453b6908deaa780ea601 to your computer and use it in GitHub Desktop.
Providing encrypted environment variables to a Lambda decrypted on boot
from lib.secret_config import load_secret_config_from_env
def lambda_handler(event, context):
"""
The Handler function, which receives the lambda event and orchestrates the
response. It is called and passed args by Lambda.
Args:
event: The lambda event, which includes arguments from API Gateway.
context: Info about the execution context of the lambda.
"""
config = load_secret_config_from_env()
# the REST of your Lambda
import argparse
import base64
import boto3
import gzip
import json
import os
"""
Decrypting a sensitive config::
aws-vault exec dev -- pipenv run -- python -m lib.secret_config --decrypt \
--file secret_config.dat \
--context '{"arn":"arn:aws:lambda:ca-central-1:123456789012:function:some-function"}' \
--key 'alias/cmk-dev'
Encrypting a sensitive config::
aws-vault exec dev -- pipenv run -- python -m lib.secret_config --encrypt \
--file example.json \
--context '{"arn":"arn:aws:lambda:ca-central-1:123456789012:function:some-function"}' \
--key 'alias/cmk-dev' > secret_config.dat
"""
def create_secret_config(config, context, kms_key_alias):
"""
Create a secret config from a dict with secrets that need protecting.
The caller of this function must have kms:Encrypt on the KMS key used to encrypt the input.
:param config: a dict with secrets that need protecting
:param context: context in which the dict will be used (ex: {'arn':
'arn:aws:lambda:ca-central-1:123456789012:function:some-function'})
:param kms_key_alias: KMS key alias (or ID) with which to
encrypt the config.
:return: Base64-encoded KMS ciphertext of the gzipped JSON encoding
of the input config.
"""
json_config = json.dumps(config)
compressed_config = gzip.compress(json_config.encode('utf-8'))
kms = boto3.client('kms')
encrypted_compressed_config = kms.encrypt(
KeyId=kms_key_alias,
Plaintext=compressed_config,
EncryptionContext=context
)
base64_encrypted_compressed_config = base64.standard_b64encode(encrypted_compressed_config['CiphertextBlob'])
return base64_encrypted_compressed_config
secret_config_from_env = None
def load_secret_config_from_env():
"""
Load a secret config from the environment variables SECRET_CONFIG
and SECRET_CONTEXT.
The caller of this function must have kms:Decrypt on the KMS key
used to encrypt the input.
:return: config as a dict
"""
global secret_config_from_env
if secret_config_from_env is None:
base64_encrypted_compressed_config = os.getenv('SECRET_CONFIG')
secret_context = json.loads(os.getenv('SECRET_CONTEXT'))
secret_config_from_env = load_secret_config(base64_encrypted_compressed_config, secret_context)
return secret_config_from_env
def load_secret_config(secret_config, context):
"""
Load a secret config from a Base64-encoded KMS ciphertext of the
gzipped JSON encoding of the config.
The caller of this function must have kms:Decrypt on the KMS key
used to encrypt the input. ::
config = {'password': 'swordfish'}
context = {'arn':'arn:aws:lambda:ca-central-1:123456789012:function:some-function'}
secret_config = save_secret_config(config, context, 'alias/some-cmk')
config = load_secret_config(secret, context)
:param secret_config: Base64-encoded KMS ciphertext of the gzipped JSON
encoding of the config.
:param context: context in which the dict will be used (ex: {'arn': \
'arn:aws:lambda:ca-central-1:123456789012:function:some-function'})
:return: config as a dict
"""
encrypted_compressed_config = base64.standard_b64decode(secret_config)
kms = boto3.client('kms')
compressed_config = kms.decrypt(
CiphertextBlob=encrypted_compressed_config,
EncryptionContext=context
)
json_config = gzip.decompress(compressed_config['Plaintext'])
config = json.loads(json_config)
return config
def process_command_line():
"""
Process the command-line arguments and print the response.
:return: None
"""
parser = argparse.ArgumentParser(description='Store and load KMS-encrypted secret config.')
parser.add_argument('--file', help='File to encrypt or decrypt', required=True)
parser.add_argument('--encrypt', help='Encrypt', action='store_true')
parser.add_argument('--decrypt', help='Decrypt', action='store_true')
parser.add_argument('--context', help='Context in which config will be used', required=True)
parser.add_argument('--key', help='KMS key alias (or ID) to use', required=True)
args = parser.parse_args()
context = json.loads(args.context)
if args.encrypt:
with open(args.file) as f:
config_json = f.read()
config = json.loads(config_json)
print(create_secret_config(config=config, context=context, kms_key_alias=args.key).decode('utf-8'))
elif args.decrypt:
with open(args.file) as f:
secret_config = f.read()
print(json.dumps(load_secret_config(secret_config=secret_config, context=context)))
if __name__ == '__main__':
process_command_line()
import pathlib
from lib.secret_config import *
from lib.setup_custom import setup_custom
def main():
parser = argparse.ArgumentParser(description='Set up configuration for this Lambda.')
parser.add_argument('--check', help='Check configuration', action='store_true')
args = parser.parse_args()
context = get_setup_context()
existing_config, tfvars_lines = load_existing_config(context)
if existing_config is not None:
print(json.dumps(existing_config, indent=2))
if args.check:
exit()
else:
response = input("""
Do you want to replace the configuration?
The configuration above exists and loads successfully.
Only 'yes' will be accepted to approve.
Enter a value: """)
if response is None or response != 'yes':
print("\nError: Replace cancelled.")
exit()
else:
print()
replacement_config = setup_custom(context, existing_config)
if replacement_config is None:
print('\nERROR: setup_custom did not return a configuration dict. Fix the code.\n')
exit(1)
secret_config = create_secret_config(
config=replacement_config,
context=context['secret_context'],
kms_key_alias=f'alias/cmk-{context["environment"]}'
)
with open(context['tfvars_path'], 'w') as tfvars:
for line in tfvars_lines:
if 'secret_config' in line:
tfvars.write(f'secret_config = "{secret_config.decode("utf-8")}"')
else:
tfvars.write(line)
tfvars.write('\n')
print(f'\nSuccessfully updated secret_config in {context["tfvars_path"]}.\n')
def get_setup_context():
sts = boto3.client('sts')
caller_identity = sts.get_caller_identity()
aws_region = os.getenv('AWS_REGION')
environment = os.getenv('AWS_VAULT')
vpc_name = environment
account_id = caller_identity['Account']
repo_name = pathlib.PurePath(os.getcwd()).name
function_name = f'{environment}-{repo_name}'
tfvars_path = f'../infrastructure-live/{environment}/{aws_region}/{vpc_name}/lambda/{repo_name}/terraform.tfvars'
context = {
'aws_region': aws_region,
'environment': environment,
'vpc_name': vpc_name,
'account_id': account_id,
'repo_name': repo_name,
'function_name': function_name,
'tfvars_path': tfvars_path,
'secret_context': {
"arn": f'arn:aws:lambda:{aws_region}:{account_id}:function:{function_name}'
}
}
return context
def load_existing_config(context):
tfvars_path = context["tfvars_path"]
existing_config = None
if pathlib.Path(tfvars_path).exists():
secret_config = None
with open(tfvars_path) as tfvars:
tfvars_lines = tfvars.readlines()
for line in tfvars_lines:
if 'secret_config = ' in line:
secret_config = line[len('secret_config = '):]
if secret_config is not None:
existing_config = load_secret_config(secret_config=secret_config, context=context['secret_context'])
return existing_config, tfvars_lines
if __name__ == '__main__':
main()
# This is an example, you'd substitute this with your own customizations for your Lambda
import json
import urllib.parse
def setup_custom(context, existing_config):
environment = context['environment']
update_range_name = 'Raw Data Stream!A2:H' if environment == 'prod' \
else f'{environment}-data-stream!A2:H'
creds = get_token(existing_config)
sheet_id = get_sheet_id(existing_config)
return {
"google_sheet_updater": {
"token": creds,
"sheet_id": sheet_id,
"update_range_name": update_range_name
}
}
def get_token(existing_config):
if 'google_sheet_updater' in existing_config and 'token' in existing_config['google_sheet_updater']:
client_secrets_path = input("Service account key file (JSON): [reuse existing config] ")
if client_secrets_path.strip() == '':
return existing_config['google_sheet_updater']['token']
else:
client_secrets_path = input("Service account key file (JSON): ")
with open(client_secrets_path) as client_secrets_file:
return json.load(client_secrets_file)
def get_sheet_id(existing_config):
default_sheet_id = existing_config.get('google_sheet_updater', {}).get('sheet_id', None)
default_sheet_url = f'[https://docs.google.com/spreadsheets/d/{default_sheet_id}] ' \
if default_sheet_id else ''
sheet_url_text = input(f'Google Sheet to edit (URL): {default_sheet_url}')
if sheet_url_text.strip() == '':
return default_sheet_id
sheet_url = urllib.parse.urlparse(sheet_url_text)
sheet_path_parts = sheet_url.path.split('/')
if ['', 'spreadsheets', 'd'] == sheet_path_parts[0:3]:
return sheet_path_parts[3]
else:
print('ERROR: invalid Google Sheet URL')
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment