-
-
Save theseanything/1bb8add0077d3a2f5d979c12c6b9f140 to your computer and use it in GitHub Desktop.
Python script to copy AWS SecretsManager Secrets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import os | |
import boto3 | |
from botocore.exceptions import ClientError | |
# -------------------------------------------------------------------- main --- | |
def copy_secrets(config): | |
dry_run = config["global.dryrun"] | |
if dry_run: | |
print(f"dry run enabled") | |
src, dst = compile_config(config) | |
src_secrets = pull_secrets(src["aws.session"], src["ssm.prefix"], config["global.excluded_secrets"]) | |
dst_secrets = tranform_secrets(src["ssm.prefix"], dst["ssm.prefix"], src_secrets) | |
results = push_secrets(dst["aws.session"], dst_secrets, dry_run) | |
return results | |
# --------------------------------------------------------------------- fns --- | |
def pull_secrets(session, prefix, excluded_secrets): | |
secrets = list_sm_secrets(session, prefix, excluded_secrets) | |
for secret in secrets: | |
secret_value = get_sm_secret_value(session, secret["Name"]) | |
yield secret_value | |
def push_secret(session, secret, dry_run): | |
current_secret = get_sm_secret_value(session, secret["Name"]) | |
result = {"action": "nochange", "name": secret["Name"]} | |
if current_secret is None: | |
result["action"] = "created" | |
if not dry_run: | |
create_sm_secret(session, secret) | |
elif current_secret["SecretString"] != secret["SecretString"]: | |
result["action"] = "updated" | |
if not dry_run: | |
put_sm_secret_value(session, secret) | |
return result | |
def push_secrets(session, secrets, dry_run): | |
results = [] | |
for secret in secrets: | |
result = push_secret(session, secret, dry_run) | |
print(f'{result["action"]:<10}{result["name"]}') | |
results.append(result) | |
return results | |
def tranform_secrets(old, new, secrets): | |
for secret in secrets: | |
new_name = secret["Name"].replace(old, new) | |
secret["Name"] = new_name | |
yield secret | |
# --------------------------------------------------------------------- aws --- | |
def create_sm_secret(session, secret): | |
sm = session.client("secretsmanager") | |
params = {"Name": secret["Name"]} | |
if "SecretString" in secret: | |
params["SecretString"] = secret["SecretString"] | |
else: | |
params["SecretBinary"] = secret["SecretBinary"] | |
response = sm.create_secret(**params) | |
return response | |
def get_sm_secret_value(session, name): | |
sm = session.client("secretsmanager") | |
secret = None | |
try: | |
secret = sm.get_secret_value(SecretId=name) | |
except ClientError as err: | |
if err.response["Error"]["Code"] == "ResourceNotFoundException": | |
pass # ignore if secret is missing | |
else: | |
raise err | |
return secret or None | |
def list_sm_secrets(session, filter_prefix, excluded_secrets): | |
sm = session.client("secretsmanager") | |
pages = sm.get_paginator("list_secrets").paginate() | |
for page in pages: | |
secrets = page["SecretList"] | |
for secret in secrets: | |
if (not secret["Name"].startswith(filter_prefix) or | |
secret["Name"] in excluded_secrets or | |
secret.get("DeletedDate") is not None): | |
continue | |
yield secret | |
def put_sm_secret_value(session, secret): | |
sm = session.client("secretsmanager") | |
params = {"SecretId": secret["Name"]} | |
if "SecretString" in secret: | |
params["SecretString"] = secret["SecretString"] | |
else: | |
params["SecretBinary"] = secret["SecretBinary"] | |
response = sm.put_secret_value(**params) | |
return response | |
# ------------------------------------------------------------------ config --- | |
def compile_config(config): | |
src = config["src"] | |
dst = config["dst"] | |
if src.get("ssm.prefix") is None or dst.get("ssm.prefix") is None: | |
raise Exception("Must define source and destination namespace!") | |
src["aws.session"] = boto3.Session( | |
region_name=src["aws.region"], profile_name=src["aws.profile"] | |
) | |
dst["aws.session"] = boto3.Session( | |
region_name=dst["aws.region"], profile_name=dst["aws.profile"] | |
) | |
return src, dst | |
def get_default_config(): | |
config = { | |
"global.dryrun": int(os.environ.get("DRY_RUN", "1")) == 1, | |
"global.excluded_secrets": [], | |
"src": { | |
"aws.region": os.environ.get("SRC_AWS_REGION", "eu-west-1"), | |
"aws.profile": os.environ.get("SRC_AWS_PROFILE"), | |
"ssm.prefix": os.environ.get("SRC_SSM_NAMESPACE"), | |
}, | |
"dst": { | |
"aws.region": os.environ.get("DST_AWS_REGION", "eu-west-1"), | |
"aws.profile": os.environ.get("DST_AWS_PROFILE"), | |
"ssm.prefix": os.environ.get("DST_SSM_NAMESPACE"), | |
}, | |
} | |
return config | |
# ---------------------------------------------------------------- handlers --- | |
def script_handler(args): | |
config = get_default_config() | |
config["src"]["ssm.prefix"] = args.src_prefix | |
config["dst"]["ssm.prefix"] = args.dst_prefix | |
config["src"]["aws.profile"] = args.src_profile | |
config["dst"]["aws.profile"] = args.dst_profile | |
config["global.dryrun"] = not args.no_dry_run | |
config["global.excluded_secrets"] = args.excluded_secret | |
copy_secrets(config) | |
def lambda_handler(event, context): | |
raise Exception("Not implemented yet!") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(prog="AWS Secrets Manager Copier") | |
parser.add_argument("--no-dry-run", help="disable dry run", action="store_true") | |
parser.add_argument("--src-prefix", help="prefix to filter source secrets", type=str, default="") | |
parser.add_argument("--dst-prefix", help="prefix to replace on destination secrets", type=str, default="") | |
parser.add_argument("--src-profile", help="AWS Profile to use for secret source") | |
parser.add_argument("--dst-profile", help="AWS Profile to use for secret destination") | |
parser.add_argument("--excluded-secret", help="name of secret to exclude", action="append", default=[]) | |
args = parser.parse_args() | |
script_handler(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment