Skip to content

Instantly share code, notes, and snippets.

@filipeandre
Last active October 14, 2025 15:51
Show Gist options
  • Save filipeandre/29686313a023065bfd502cfe486e17ea to your computer and use it in GitHub Desktop.
Save filipeandre/29686313a023065bfd502cfe486e17ea to your computer and use it in GitHub Desktop.
Import resources into existing stack
#!/usr/bin/env python3
"""
Create an IMPORT change set while preserving all existing parameters,
with inline resource specs and optional parameter overrides.
Usage examples:
# Single DynamoDB import, keep all parameters as-is
python import_changeset.py \
--stack-name TargetStack \
--template-url https://amzn-s3-demo-bucket.s3.us-west-2.amazonaws.com/TemplateToImport.json \
--import "ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games"
# Multiple resources + parameter overrides
python import_changeset.py \
--stack-name TargetStack \
--template-url https://.../TemplateToImport.json \
--import "ResourceType=AWS::S3::Bucket,LogicalResourceId=LogsBucket,ResourceIdentifier.BucketName=my-logs-bucket" \
--import "ResourceType=AWS::SNS::Topic,LogicalResourceId=AlertsTopic,ResourceIdentifier.TopicArn=arn:aws:sns:us-west-2:123456789012:alerts" \
--override Env=staging --override DBSize=large
# AssumeRole for auth and set a CFN execution role
python import_changeset.py \
--stack-name TargetStack \
--template-url https://.../TemplateToImport.json \
--import "ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games" \
--role-arn arn:aws:iam::111122223333:role/Admin \
--execution-role-arn arn:aws:iam::111122223333:role/CloudFormationExecutionRole
Notes:
- Parameter overrides only affect keys you specify; all others are preserved.
- Masked/NoEcho parameters are handled via UsePreviousValue=True.
- Your template must reflect current resource configuration and include DeletionPolicy on imported resources.
"""
import argparse
import json
import re
import sys
import time
from datetime import datetime
from typing import Any, Dict, List, Optional
import boto3
from botocore.exceptions import ClientError
# -------------------- Auth / Session -------------------- #
def assume_role_session(role_arn: str, region: Optional[str] = None, session_name: str = "ImportChangeSetSession"):
sts = boto3.client("sts", region_name=region)
creds = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=session_name,
DurationSeconds=3600
)["Credentials"]
return boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
region_name=region
)
# -------------------- Parsing helpers -------------------- #
def parse_inline_import(s: str) -> Dict[str, Any]:
"""
Parse strings like:
'ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games'
into:
{
"ResourceType": "AWS::DynamoDB::Table",
"LogicalResourceId": "GamesTable",
"ResourceIdentifier": {"TableName": "Games"}
}
Supports escaping commas with '\,'.
"""
if not s:
raise ValueError("--import value is empty")
parts = re.split(r'(?<!\\),', s)
fields: Dict[str, Any] = {}
resource_identifier: Dict[str, str] = {}
for part in parts:
part = part.replace(r'\,', ',').strip()
if not part:
continue
if "=" not in part:
raise ValueError(f"Invalid --import segment (missing '='): {part}")
k, v = part.split("=", 1)
k = k.strip()
v = v.strip()
if k == "ResourceType":
fields["ResourceType"] = v
elif k == "LogicalResourceId":
fields["LogicalResourceId"] = v
elif k.startswith("ResourceIdentifier."):
ri_key = k.split(".", 1)[1]
if not ri_key:
raise ValueError(f"Invalid ResourceIdentifier key in --import: {k}")
resource_identifier[ri_key] = v
else:
raise ValueError(f"Unknown key in --import: {k}")
if "ResourceType" not in fields or "LogicalResourceId" not in fields:
raise ValueError("Each --import must include ResourceType and LogicalResourceId")
if not resource_identifier:
raise ValueError("Each --import must include at least one ResourceIdentifier.X=Y")
fields["ResourceIdentifier"] = resource_identifier
return fields
def parse_overrides(kv_list: List[str]) -> Dict[str, str]:
"""
Parse repeated '--override KEY=VALUE' flags into a dict.
"""
overrides: Dict[str, str] = {}
for item in kv_list or []:
if "=" not in item:
raise ValueError(f"Invalid --override value (expected KEY=VALUE): {item}")
k, v = item.split("=", 1)
k = k.strip()
v = v.strip()
if not k:
raise ValueError(f"Invalid override key in: {item}")
overrides[k] = v
return overrides
# -------------------- CloudFormation helpers -------------------- #
def get_stack_details(cf, stack_name: str) -> Dict[str, Any]:
resp = cf.describe_stacks(StackName=stack_name)
stacks = resp.get("Stacks", [])
if not stacks:
raise RuntimeError(f"Stack {stack_name} not found")
return stacks[0]
def build_parameters_for_reuse(stack_params: List[Dict[str, str]], overrides: Dict[str, str]) -> List[Dict[str, str]]:
"""
Build Parameters for create_change_set:
- Preserve all existing keys (UsePreviousValue=True for masked NoEcho).
- Apply explicit overrides where provided (wins over UsePreviousValue).
"""
params: List[Dict[str, str]] = []
masked_tokens = {"****", "*****", "******"}
for p in stack_params:
key = p["ParameterKey"]
val = p.get("ParameterValue")
if key in overrides:
params.append({"ParameterKey": key, "ParameterValue": overrides[key]})
else:
if val is None or val in masked_tokens:
params.append({"ParameterKey": key, "UsePreviousValue": True})
else:
params.append({"ParameterKey": key, "ParameterValue": val})
existing_keys = {p["ParameterKey"] for p in stack_params}
for k, v in overrides.items():
if k not in existing_keys:
params.append({"ParameterKey": k, "ParameterValue": v})
return params
def wait_for_changeset(cf, stack_name: str, change_set_name: str, poll_seconds: int = 5, timeout_seconds: int = 900):
start = time.time()
last_status = None
while True:
try:
cs = cf.describe_change_set(StackName=stack_name, ChangeSetName=change_set_name)
status = cs.get("Status")
if status != last_status:
print(f"[wait] ChangeSet status: {status}")
last_status = status
if status in ("CREATE_COMPLETE", "FAILED"):
return cs
except ClientError as e:
if e.response["Error"]["Code"] != "ChangeSetNotFound":
raise
if time.time() - start > timeout_seconds:
raise TimeoutError("Timed out waiting for change set creation.")
time.sleep(poll_seconds)
def wait_for_stack_import(cf, stack_name: str, poll_seconds: int = 10, timeout_seconds: int = 3600):
"""
Wait until the stack reaches IMPORT_COMPLETE (or a terminal failure/rollback).
"""
start = time.time()
last = None
terminal_failures = {
"IMPORT_ROLLBACK_COMPLETE",
"IMPORT_ROLLBACK_FAILED",
"ROLLBACK_COMPLETE",
"ROLLBACK_FAILED",
"UPDATE_ROLLBACK_COMPLETE",
"UPDATE_ROLLBACK_FAILED",
"DELETE_COMPLETE",
"DELETE_FAILED",
"IMPORT_FAILED",
}
while True:
resp = cf.describe_stacks(StackName=stack_name)
st = resp["Stacks"][0]["StackStatus"]
if st != last:
print(f"[wait] StackStatus: {st}")
last = st
if st == "IMPORT_COMPLETE":
return st
if st in terminal_failures:
raise RuntimeError(f"Stack reached terminal failure state: {st}")
if time.time() - start > timeout_seconds:
raise TimeoutError("Timed out waiting for stack import to complete.")
time.sleep(poll_seconds)
def print_changeset_summary(cs: Dict[str, Any]):
print("\n=== Change Set Summary ===")
print(f"Name: {cs.get('ChangeSetName')}")
print(f"Id: {cs.get('ChangeSetId')}")
print(f"Stack: {cs.get('StackName')}")
print(f"Type: {cs.get('ChangeSetType')}")
print(f"Status: {cs.get('Status')}")
print(f"StatusReason: {cs.get('StatusReason', '')}")
print(f"CreationTime: {cs.get('CreationTime')}")
changes = cs.get("Changes", [])
if not changes:
print("\nNo resource changes reported.")
return
print("\n--- Resource Changes ---")
for i, ch in enumerate(changes, 1):
rc = ch.get("ResourceChange", {})
action = rc.get("Action")
rtype = rc.get("ResourceType")
logical_id = rc.get("LogicalResourceId")
replacement = rc.get("Replacement")
print(f"[{i}] {action} {rtype} ({logical_id}) Replacement: {replacement}")
for d in rc.get("Details", []) or []:
tgt = d.get("Target", {})
name = tgt.get("Name")
requires_recreation = d.get("RequiresRecreation")
change_source = d.get("ChangeSource")
print(f" - {name} (recreate: {requires_recreation}, source: {change_source})")
# -------------------- Main -------------------- #
def main():
parser = argparse.ArgumentParser(description="Create an IMPORT change set with inline resources, preserving parameters.")
parser.add_argument("--stack-name", required=True, help="Existing stack name to import resources into.")
parser.add_argument("--template-url", required=True, help="S3 URL of the template that includes the to-be-imported resources.")
parser.add_argument(
"--import",
dest="imports",
action="append",
default=[],
help=("Repeatable. Inline resource to import. "
"Format: 'ResourceType=AWS::Service::Type,LogicalResourceId=LogicalId,ResourceIdentifier.Key=Value[,ResourceIdentifier.Key2=Value2...]' "
"Example: 'ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games'")
)
parser.add_argument("--override", dest="overrides", action="append", default=[],
help="Repeatable. Parameter override in KEY=VALUE form. Overrides only those keys; all others are preserved.")
parser.add_argument("--change-set-name", default=None, help="Optional change set name (default: Import-YYYYmmddHHMMSS).")
parser.add_argument("--region", default=None, help="AWS region (falls back to env/defaults).")
parser.add_argument("--role-arn", default=None, help="Assume this role before calling CloudFormation.")
parser.add_argument("--execution-role-arn", default=None,
help="Optional CloudFormation execution role (RoleARN) for the change set.")
parser.add_argument("--execute", action="store_true", help="If set, execute the change set after creation.")
parser.add_argument("--drift-check", action="store_true", help="If set, run stack drift detection after successful import.")
args = parser.parse_args()
# --- Resolve session/clients ---
session = assume_role_session(args.role_arn, args.region) if args.role_arn else boto3.Session(region_name=args.region)
cf = session.client("cloudformation")
# --- Validate inputs & derive structures ---
if not args.imports:
raise ValueError("At least one --import is required.")
resources_to_import = [parse_inline_import(s) for s in args.imports]
overrides = parse_overrides(args.overrides)
# Fetch existing stack parameters to preserve
stack = get_stack_details(cf, args.stack_name)
stack_params = stack.get("Parameters", [])
params = build_parameters_for_reuse(stack_params, overrides)
# Change set name
cs_name = args.change_set_name or f"Import-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
# --- Create change set (IMPORT) ---
create_kwargs: Dict[str, Any] = {
"StackName": args.stack_name,
"TemplateURL": args.template_url,
"Parameters": params,
"ChangeSetName": cs_name,
"ChangeSetType": "IMPORT",
"ResourcesToImport": resources_to_import,
}
# Important: RoleARN is the correct param name for CFN to assume during the operation
if args.execution_role_arn:
create_kwargs["RoleARN"] = args.execution_role_arn
# Optional: If your template requires these, uncomment as needed
# create_kwargs["Capabilities"] = ["CAPABILITY_NAMED_IAM", "CAPABILITY_AUTO_EXPAND"]
print("[info] Creating IMPORT change set...")
cf.create_change_set(**create_kwargs)
# --- Wait for change set creation ---
cs = wait_for_changeset(cf, args.stack_name, cs_name)
print_changeset_summary(cs)
if cs.get("Status") == "FAILED":
reason = cs.get("StatusReason", "")
print(f"\n[error] Change set creation failed: {reason}")
# If a failed change set is left around, clean it up to allow reuse of the name
try:
cf.delete_change_set(StackName=args.stack_name, ChangeSetName=cs["ChangeSetId"])
print("[info] Deleted failed change set.")
except Exception as e:
print(f"[warn] Could not delete failed change set: {e}")
sys.exit(1)
# --- Optionally execute ---
if args.execute:
print("\n[info] Executing change set...")
cf.execute_change_set(StackName=args.stack_name, ChangeSetName=cs["ChangeSetId"])
# Wait for import to complete
final_status = wait_for_stack_import(cf, args.stack_name)
print(f"[ok] Stack import finished with status: {final_status}")
# Optional drift check
if args.drift_check:
print("[info] Starting drift detection...")
dd = cf.detect_stack_drift(StackName=args.stack_name)
drift_id = dd["StackDriftDetectionId"]
# Poll drift detection status
while True:
resp = cf.describe_stack_drift_detection_status(StackDriftDetectionId=drift_id)
dd_status = resp["DetectionStatus"]
print(f"[wait] Drift detection: {dd_status}")
if dd_status in ("DETECTION_COMPLETE", "DETECTION_FAILED"):
print(f"[info] Drift overall status: {resp['StackDriftStatus']}")
if dd_status == "DETECTION_FAILED":
print(f"[warn] Drift detection failed: {resp.get('DetectionStatusReason','')}")
break
time.sleep(5)
print("\n[done] Completed.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[interrupt] Exiting on Ctrl+C")
sys.exit(130)
except Exception as e:
print(f"[fatal] {e}")
sys.exit(1)
@filipeandre
Copy link
Author

filipeandre commented Oct 14, 2025

# clone or update
REPO_FOLDER=change_set
REPO_URL=https://gist.github.com/filipeandre/29686313a023065bfd502cfe486e17ea.git
[ -d .git ] && git pull --rebase || { [ -d $REPO_FOLDER/.git ] && git -C $REPO_FOLDER pull --rebase || git clone $REPO_URL $REPO_FOLDER; cd $REPO_FOLDER 2>/dev/null || true; }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment