Created
February 3, 2018 00:03
-
-
Save KyleJamesWalker/d7f71a6ed13d855da4e37f4adb180e2a to your computer and use it in GitHub Desktop.
Quickly Edit KMS Documents
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import hashlib | |
import os | |
import tempfile | |
import sys | |
def query_yes_no(question, default="yes"): | |
"""Ask a yes/no question via raw_input() and return their answer. | |
"question" is a string that is presented to the user. | |
"default" is the presumed answer if the user just hits <Enter>. | |
It must be "yes" (the default), "no" or None (meaning | |
an answer is required of the user). | |
The "answer" return value is True for "yes" or False for "no". | |
""" | |
valid = {"yes": True, "y": True, "ye": True, | |
"no": False, "n": False} | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("invalid default answer: '%s'" % default) | |
while True: | |
sys.stdout.write(question + prompt) | |
choice = input().lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
sys.stdout.write("Please respond with 'yes' or 'no' " | |
"(or 'y' or 'n').\n") | |
def get_s3(env: str, app_name: str) -> str: | |
"""Get the KMS S3 Location""" | |
return "s3://zefr-secrets/{env}/{app_name}/config.env".format( | |
env=env, | |
app_name=app_name, | |
) | |
def file_empty(filename): | |
"""Is file empty""" | |
return not bool(open(filename).read().strip()) | |
def run_kms(env: str, src: str, dest: str): | |
"""Run the KMS Command""" | |
os.system("{cmd} {alias} {src} {dest}".format( | |
cmd="aws --quiet s3 cp --sse aws:kms --sse-kms-key-id", | |
alias="alias/secrets-{env}".format(env=env), | |
src=src, | |
dest=dest, | |
)) | |
def main(): | |
parser = argparse.ArgumentParser("ZEFR Secrets") | |
parser.add_argument( | |
"app", type=lambda x: x.replace('_', '-').lower(), | |
) | |
parser.add_argument( | |
"env", type=lambda x: x.lower(), | |
choices=['base', 'qa', 'stage', 'prod'], | |
help="Environment to generate token", | |
) | |
parser.add_argument( | |
"-e", "--editor", default="$EDITOR" | |
) | |
parser.add_argument( | |
"-v", "--view", action='store_true' | |
) | |
args = parser.parse_args() | |
s3_path = get_s3(args.env, args.app) | |
with tempfile.NamedTemporaryFile() as creds_file: | |
# Get the secrets file | |
run_kms(args.env, s3_path, creds_file.name) | |
org_hash = hashlib.md5(open(creds_file.name, 'rb').read()).hexdigest() | |
# Exit if just viewing contents | |
if args.view: | |
print(open(creds_file.name).read(), end='') | |
sys.exit(0) | |
# Edit secrets file | |
os.system("{} {}".format(args.editor, creds_file.name)) | |
new_hash = hashlib.md5(open(creds_file.name, 'rb').read()).hexdigest() | |
# Verify contents | |
empty = file_empty(creds_file.name) | |
if empty and not query_yes_no("Empty file detected upload?"): | |
print("Aborting upload: Empty file") | |
sys.exit(0) | |
if not empty and org_hash == new_hash: | |
print("Aborting upload: No changes detected") | |
sys.exit(0) | |
# Upload secrets | |
print("Uploading Secrets") | |
run_kms(args.env, creds_file.name, s3_path) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment