Created
October 30, 2019 18:52
-
-
Save AlainODea/f4371a28be01453b6908deaa780ea601 to your computer and use it in GitHub Desktop.
Providing encrypted environment variables to a Lambda decrypted on boot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lib.secret_config import load_secret_config_from_env | |
def lambda_handler(event, context): | |
""" | |
The Handler function, which receives the lambda event and orchestrates the | |
response. It is called and passed args by Lambda. | |
Args: | |
event: The lambda event, which includes arguments from API Gateway. | |
context: Info about the execution context of the lambda. | |
""" | |
config = load_secret_config_from_env() | |
# the REST of your Lambda |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import base64 | |
import boto3 | |
import gzip | |
import json | |
import os | |
""" | |
Decrypting a sensitive config:: | |
aws-vault exec dev -- pipenv run -- python -m lib.secret_config --decrypt \ | |
--file secret_config.dat \ | |
--context '{"arn":"arn:aws:lambda:ca-central-1:123456789012:function:some-function"}' \ | |
--key 'alias/cmk-dev' | |
Encrypting a sensitive config:: | |
aws-vault exec dev -- pipenv run -- python -m lib.secret_config --encrypt \ | |
--file example.json \ | |
--context '{"arn":"arn:aws:lambda:ca-central-1:123456789012:function:some-function"}' \ | |
--key 'alias/cmk-dev' > secret_config.dat | |
""" | |
def create_secret_config(config, context, kms_key_alias): | |
""" | |
Create a secret config from a dict with secrets that need protecting. | |
The caller of this function must have kms:Encrypt on the KMS key used to encrypt the input. | |
:param config: a dict with secrets that need protecting | |
:param context: context in which the dict will be used (ex: {'arn': | |
'arn:aws:lambda:ca-central-1:123456789012:function:some-function'}) | |
:param kms_key_alias: KMS key alias (or ID) with which to | |
encrypt the config. | |
:return: Base64-encoded KMS ciphertext of the gzipped JSON encoding | |
of the input config. | |
""" | |
json_config = json.dumps(config) | |
compressed_config = gzip.compress(json_config.encode('utf-8')) | |
kms = boto3.client('kms') | |
encrypted_compressed_config = kms.encrypt( | |
KeyId=kms_key_alias, | |
Plaintext=compressed_config, | |
EncryptionContext=context | |
) | |
base64_encrypted_compressed_config = base64.standard_b64encode(encrypted_compressed_config['CiphertextBlob']) | |
return base64_encrypted_compressed_config | |
secret_config_from_env = None | |
def load_secret_config_from_env(): | |
""" | |
Load a secret config from the environment variables SECRET_CONFIG | |
and SECRET_CONTEXT. | |
The caller of this function must have kms:Decrypt on the KMS key | |
used to encrypt the input. | |
:return: config as a dict | |
""" | |
global secret_config_from_env | |
if secret_config_from_env is None: | |
base64_encrypted_compressed_config = os.getenv('SECRET_CONFIG') | |
secret_context = json.loads(os.getenv('SECRET_CONTEXT')) | |
secret_config_from_env = load_secret_config(base64_encrypted_compressed_config, secret_context) | |
return secret_config_from_env | |
def load_secret_config(secret_config, context): | |
""" | |
Load a secret config from a Base64-encoded KMS ciphertext of the | |
gzipped JSON encoding of the config. | |
The caller of this function must have kms:Decrypt on the KMS key | |
used to encrypt the input. :: | |
config = {'password': 'swordfish'} | |
context = {'arn':'arn:aws:lambda:ca-central-1:123456789012:function:some-function'} | |
secret_config = save_secret_config(config, context, 'alias/some-cmk') | |
config = load_secret_config(secret, context) | |
:param secret_config: Base64-encoded KMS ciphertext of the gzipped JSON | |
encoding of the config. | |
:param context: context in which the dict will be used (ex: {'arn': \ | |
'arn:aws:lambda:ca-central-1:123456789012:function:some-function'}) | |
:return: config as a dict | |
""" | |
encrypted_compressed_config = base64.standard_b64decode(secret_config) | |
kms = boto3.client('kms') | |
compressed_config = kms.decrypt( | |
CiphertextBlob=encrypted_compressed_config, | |
EncryptionContext=context | |
) | |
json_config = gzip.decompress(compressed_config['Plaintext']) | |
config = json.loads(json_config) | |
return config | |
def process_command_line(): | |
""" | |
Process the command-line arguments and print the response. | |
:return: None | |
""" | |
parser = argparse.ArgumentParser(description='Store and load KMS-encrypted secret config.') | |
parser.add_argument('--file', help='File to encrypt or decrypt', required=True) | |
parser.add_argument('--encrypt', help='Encrypt', action='store_true') | |
parser.add_argument('--decrypt', help='Decrypt', action='store_true') | |
parser.add_argument('--context', help='Context in which config will be used', required=True) | |
parser.add_argument('--key', help='KMS key alias (or ID) to use', required=True) | |
args = parser.parse_args() | |
context = json.loads(args.context) | |
if args.encrypt: | |
with open(args.file) as f: | |
config_json = f.read() | |
config = json.loads(config_json) | |
print(create_secret_config(config=config, context=context, kms_key_alias=args.key).decode('utf-8')) | |
elif args.decrypt: | |
with open(args.file) as f: | |
secret_config = f.read() | |
print(json.dumps(load_secret_config(secret_config=secret_config, context=context))) | |
if __name__ == '__main__': | |
process_command_line() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pathlib | |
from lib.secret_config import * | |
from lib.setup_custom import setup_custom | |
def main(): | |
parser = argparse.ArgumentParser(description='Set up configuration for this Lambda.') | |
parser.add_argument('--check', help='Check configuration', action='store_true') | |
args = parser.parse_args() | |
context = get_setup_context() | |
existing_config, tfvars_lines = load_existing_config(context) | |
if existing_config is not None: | |
print(json.dumps(existing_config, indent=2)) | |
if args.check: | |
exit() | |
else: | |
response = input(""" | |
Do you want to replace the configuration? | |
The configuration above exists and loads successfully. | |
Only 'yes' will be accepted to approve. | |
Enter a value: """) | |
if response is None or response != 'yes': | |
print("\nError: Replace cancelled.") | |
exit() | |
else: | |
print() | |
replacement_config = setup_custom(context, existing_config) | |
if replacement_config is None: | |
print('\nERROR: setup_custom did not return a configuration dict. Fix the code.\n') | |
exit(1) | |
secret_config = create_secret_config( | |
config=replacement_config, | |
context=context['secret_context'], | |
kms_key_alias=f'alias/cmk-{context["environment"]}' | |
) | |
with open(context['tfvars_path'], 'w') as tfvars: | |
for line in tfvars_lines: | |
if 'secret_config' in line: | |
tfvars.write(f'secret_config = "{secret_config.decode("utf-8")}"') | |
else: | |
tfvars.write(line) | |
tfvars.write('\n') | |
print(f'\nSuccessfully updated secret_config in {context["tfvars_path"]}.\n') | |
def get_setup_context(): | |
sts = boto3.client('sts') | |
caller_identity = sts.get_caller_identity() | |
aws_region = os.getenv('AWS_REGION') | |
environment = os.getenv('AWS_VAULT') | |
vpc_name = environment | |
account_id = caller_identity['Account'] | |
repo_name = pathlib.PurePath(os.getcwd()).name | |
function_name = f'{environment}-{repo_name}' | |
tfvars_path = f'../infrastructure-live/{environment}/{aws_region}/{vpc_name}/lambda/{repo_name}/terraform.tfvars' | |
context = { | |
'aws_region': aws_region, | |
'environment': environment, | |
'vpc_name': vpc_name, | |
'account_id': account_id, | |
'repo_name': repo_name, | |
'function_name': function_name, | |
'tfvars_path': tfvars_path, | |
'secret_context': { | |
"arn": f'arn:aws:lambda:{aws_region}:{account_id}:function:{function_name}' | |
} | |
} | |
return context | |
def load_existing_config(context): | |
tfvars_path = context["tfvars_path"] | |
existing_config = None | |
if pathlib.Path(tfvars_path).exists(): | |
secret_config = None | |
with open(tfvars_path) as tfvars: | |
tfvars_lines = tfvars.readlines() | |
for line in tfvars_lines: | |
if 'secret_config = ' in line: | |
secret_config = line[len('secret_config = '):] | |
if secret_config is not None: | |
existing_config = load_secret_config(secret_config=secret_config, context=context['secret_context']) | |
return existing_config, tfvars_lines | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is an example, you'd substitute this with your own customizations for your Lambda | |
import json | |
import urllib.parse | |
def setup_custom(context, existing_config): | |
environment = context['environment'] | |
update_range_name = 'Raw Data Stream!A2:H' if environment == 'prod' \ | |
else f'{environment}-data-stream!A2:H' | |
creds = get_token(existing_config) | |
sheet_id = get_sheet_id(existing_config) | |
return { | |
"google_sheet_updater": { | |
"token": creds, | |
"sheet_id": sheet_id, | |
"update_range_name": update_range_name | |
} | |
} | |
def get_token(existing_config): | |
if 'google_sheet_updater' in existing_config and 'token' in existing_config['google_sheet_updater']: | |
client_secrets_path = input("Service account key file (JSON): [reuse existing config] ") | |
if client_secrets_path.strip() == '': | |
return existing_config['google_sheet_updater']['token'] | |
else: | |
client_secrets_path = input("Service account key file (JSON): ") | |
with open(client_secrets_path) as client_secrets_file: | |
return json.load(client_secrets_file) | |
def get_sheet_id(existing_config): | |
default_sheet_id = existing_config.get('google_sheet_updater', {}).get('sheet_id', None) | |
default_sheet_url = f'[https://docs.google.com/spreadsheets/d/{default_sheet_id}] ' \ | |
if default_sheet_id else '' | |
sheet_url_text = input(f'Google Sheet to edit (URL): {default_sheet_url}') | |
if sheet_url_text.strip() == '': | |
return default_sheet_id | |
sheet_url = urllib.parse.urlparse(sheet_url_text) | |
sheet_path_parts = sheet_url.path.split('/') | |
if ['', 'spreadsheets', 'd'] == sheet_path_parts[0:3]: | |
return sheet_path_parts[3] | |
else: | |
print('ERROR: invalid Google Sheet URL') | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment