Last active
October 14, 2025 15:51
-
-
Save filipeandre/29686313a023065bfd502cfe486e17ea to your computer and use it in GitHub Desktop.
Import resources into existing stack
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Create an IMPORT change set while preserving all existing parameters, | |
| with inline resource specs and optional parameter overrides. | |
| Usage examples: | |
| # Single DynamoDB import, keep all parameters as-is | |
| python import_changeset.py \ | |
| --stack-name TargetStack \ | |
| --template-url https://amzn-s3-demo-bucket.s3.us-west-2.amazonaws.com/TemplateToImport.json \ | |
| --import "ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games" | |
| # Multiple resources + parameter overrides | |
| python import_changeset.py \ | |
| --stack-name TargetStack \ | |
| --template-url https://.../TemplateToImport.json \ | |
| --import "ResourceType=AWS::S3::Bucket,LogicalResourceId=LogsBucket,ResourceIdentifier.BucketName=my-logs-bucket" \ | |
| --import "ResourceType=AWS::SNS::Topic,LogicalResourceId=AlertsTopic,ResourceIdentifier.TopicArn=arn:aws:sns:us-west-2:123456789012:alerts" \ | |
| --override Env=staging --override DBSize=large | |
| # AssumeRole for auth and set a CFN execution role | |
| python import_changeset.py \ | |
| --stack-name TargetStack \ | |
| --template-url https://.../TemplateToImport.json \ | |
| --import "ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games" \ | |
| --role-arn arn:aws:iam::111122223333:role/Admin \ | |
| --execution-role-arn arn:aws:iam::111122223333:role/CloudFormationExecutionRole | |
| Notes: | |
| - Parameter overrides only affect keys you specify; all others are preserved. | |
| - Masked/NoEcho parameters are handled via UsePreviousValue=True. | |
| - Your template must reflect current resource configuration and include DeletionPolicy on imported resources. | |
| """ | |
| import argparse | |
| import json | |
| import re | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| # -------------------- Auth / Session -------------------- # | |
| def assume_role_session(role_arn: str, region: Optional[str] = None, session_name: str = "ImportChangeSetSession"): | |
| sts = boto3.client("sts", region_name=region) | |
| creds = sts.assume_role( | |
| RoleArn=role_arn, | |
| RoleSessionName=session_name, | |
| DurationSeconds=3600 | |
| )["Credentials"] | |
| return boto3.Session( | |
| aws_access_key_id=creds["AccessKeyId"], | |
| aws_secret_access_key=creds["SecretAccessKey"], | |
| aws_session_token=creds["SessionToken"], | |
| region_name=region | |
| ) | |
| # -------------------- Parsing helpers -------------------- # | |
| def parse_inline_import(s: str) -> Dict[str, Any]: | |
| """ | |
| Parse strings like: | |
| 'ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games' | |
| into: | |
| { | |
| "ResourceType": "AWS::DynamoDB::Table", | |
| "LogicalResourceId": "GamesTable", | |
| "ResourceIdentifier": {"TableName": "Games"} | |
| } | |
| Supports escaping commas with '\,'. | |
| """ | |
| if not s: | |
| raise ValueError("--import value is empty") | |
| parts = re.split(r'(?<!\\),', s) | |
| fields: Dict[str, Any] = {} | |
| resource_identifier: Dict[str, str] = {} | |
| for part in parts: | |
| part = part.replace(r'\,', ',').strip() | |
| if not part: | |
| continue | |
| if "=" not in part: | |
| raise ValueError(f"Invalid --import segment (missing '='): {part}") | |
| k, v = part.split("=", 1) | |
| k = k.strip() | |
| v = v.strip() | |
| if k == "ResourceType": | |
| fields["ResourceType"] = v | |
| elif k == "LogicalResourceId": | |
| fields["LogicalResourceId"] = v | |
| elif k.startswith("ResourceIdentifier."): | |
| ri_key = k.split(".", 1)[1] | |
| if not ri_key: | |
| raise ValueError(f"Invalid ResourceIdentifier key in --import: {k}") | |
| resource_identifier[ri_key] = v | |
| else: | |
| raise ValueError(f"Unknown key in --import: {k}") | |
| if "ResourceType" not in fields or "LogicalResourceId" not in fields: | |
| raise ValueError("Each --import must include ResourceType and LogicalResourceId") | |
| if not resource_identifier: | |
| raise ValueError("Each --import must include at least one ResourceIdentifier.X=Y") | |
| fields["ResourceIdentifier"] = resource_identifier | |
| return fields | |
| def parse_overrides(kv_list: List[str]) -> Dict[str, str]: | |
| """ | |
| Parse repeated '--override KEY=VALUE' flags into a dict. | |
| """ | |
| overrides: Dict[str, str] = {} | |
| for item in kv_list or []: | |
| if "=" not in item: | |
| raise ValueError(f"Invalid --override value (expected KEY=VALUE): {item}") | |
| k, v = item.split("=", 1) | |
| k = k.strip() | |
| v = v.strip() | |
| if not k: | |
| raise ValueError(f"Invalid override key in: {item}") | |
| overrides[k] = v | |
| return overrides | |
| # -------------------- CloudFormation helpers -------------------- # | |
| def get_stack_details(cf, stack_name: str) -> Dict[str, Any]: | |
| resp = cf.describe_stacks(StackName=stack_name) | |
| stacks = resp.get("Stacks", []) | |
| if not stacks: | |
| raise RuntimeError(f"Stack {stack_name} not found") | |
| return stacks[0] | |
| def build_parameters_for_reuse(stack_params: List[Dict[str, str]], overrides: Dict[str, str]) -> List[Dict[str, str]]: | |
| """ | |
| Build Parameters for create_change_set: | |
| - Preserve all existing keys (UsePreviousValue=True for masked NoEcho). | |
| - Apply explicit overrides where provided (wins over UsePreviousValue). | |
| """ | |
| params: List[Dict[str, str]] = [] | |
| masked_tokens = {"****", "*****", "******"} | |
| for p in stack_params: | |
| key = p["ParameterKey"] | |
| val = p.get("ParameterValue") | |
| if key in overrides: | |
| params.append({"ParameterKey": key, "ParameterValue": overrides[key]}) | |
| else: | |
| if val is None or val in masked_tokens: | |
| params.append({"ParameterKey": key, "UsePreviousValue": True}) | |
| else: | |
| params.append({"ParameterKey": key, "ParameterValue": val}) | |
| existing_keys = {p["ParameterKey"] for p in stack_params} | |
| for k, v in overrides.items(): | |
| if k not in existing_keys: | |
| params.append({"ParameterKey": k, "ParameterValue": v}) | |
| return params | |
| def wait_for_changeset(cf, stack_name: str, change_set_name: str, poll_seconds: int = 5, timeout_seconds: int = 900): | |
| start = time.time() | |
| last_status = None | |
| while True: | |
| try: | |
| cs = cf.describe_change_set(StackName=stack_name, ChangeSetName=change_set_name) | |
| status = cs.get("Status") | |
| if status != last_status: | |
| print(f"[wait] ChangeSet status: {status}") | |
| last_status = status | |
| if status in ("CREATE_COMPLETE", "FAILED"): | |
| return cs | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] != "ChangeSetNotFound": | |
| raise | |
| if time.time() - start > timeout_seconds: | |
| raise TimeoutError("Timed out waiting for change set creation.") | |
| time.sleep(poll_seconds) | |
| def wait_for_stack_import(cf, stack_name: str, poll_seconds: int = 10, timeout_seconds: int = 3600): | |
| """ | |
| Wait until the stack reaches IMPORT_COMPLETE (or a terminal failure/rollback). | |
| """ | |
| start = time.time() | |
| last = None | |
| terminal_failures = { | |
| "IMPORT_ROLLBACK_COMPLETE", | |
| "IMPORT_ROLLBACK_FAILED", | |
| "ROLLBACK_COMPLETE", | |
| "ROLLBACK_FAILED", | |
| "UPDATE_ROLLBACK_COMPLETE", | |
| "UPDATE_ROLLBACK_FAILED", | |
| "DELETE_COMPLETE", | |
| "DELETE_FAILED", | |
| "IMPORT_FAILED", | |
| } | |
| while True: | |
| resp = cf.describe_stacks(StackName=stack_name) | |
| st = resp["Stacks"][0]["StackStatus"] | |
| if st != last: | |
| print(f"[wait] StackStatus: {st}") | |
| last = st | |
| if st == "IMPORT_COMPLETE": | |
| return st | |
| if st in terminal_failures: | |
| raise RuntimeError(f"Stack reached terminal failure state: {st}") | |
| if time.time() - start > timeout_seconds: | |
| raise TimeoutError("Timed out waiting for stack import to complete.") | |
| time.sleep(poll_seconds) | |
| def print_changeset_summary(cs: Dict[str, Any]): | |
| print("\n=== Change Set Summary ===") | |
| print(f"Name: {cs.get('ChangeSetName')}") | |
| print(f"Id: {cs.get('ChangeSetId')}") | |
| print(f"Stack: {cs.get('StackName')}") | |
| print(f"Type: {cs.get('ChangeSetType')}") | |
| print(f"Status: {cs.get('Status')}") | |
| print(f"StatusReason: {cs.get('StatusReason', '')}") | |
| print(f"CreationTime: {cs.get('CreationTime')}") | |
| changes = cs.get("Changes", []) | |
| if not changes: | |
| print("\nNo resource changes reported.") | |
| return | |
| print("\n--- Resource Changes ---") | |
| for i, ch in enumerate(changes, 1): | |
| rc = ch.get("ResourceChange", {}) | |
| action = rc.get("Action") | |
| rtype = rc.get("ResourceType") | |
| logical_id = rc.get("LogicalResourceId") | |
| replacement = rc.get("Replacement") | |
| print(f"[{i}] {action} {rtype} ({logical_id}) Replacement: {replacement}") | |
| for d in rc.get("Details", []) or []: | |
| tgt = d.get("Target", {}) | |
| name = tgt.get("Name") | |
| requires_recreation = d.get("RequiresRecreation") | |
| change_source = d.get("ChangeSource") | |
| print(f" - {name} (recreate: {requires_recreation}, source: {change_source})") | |
| # -------------------- Main -------------------- # | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Create an IMPORT change set with inline resources, preserving parameters.") | |
| parser.add_argument("--stack-name", required=True, help="Existing stack name to import resources into.") | |
| parser.add_argument("--template-url", required=True, help="S3 URL of the template that includes the to-be-imported resources.") | |
| parser.add_argument( | |
| "--import", | |
| dest="imports", | |
| action="append", | |
| default=[], | |
| help=("Repeatable. Inline resource to import. " | |
| "Format: 'ResourceType=AWS::Service::Type,LogicalResourceId=LogicalId,ResourceIdentifier.Key=Value[,ResourceIdentifier.Key2=Value2...]' " | |
| "Example: 'ResourceType=AWS::DynamoDB::Table,LogicalResourceId=GamesTable,ResourceIdentifier.TableName=Games'") | |
| ) | |
| parser.add_argument("--override", dest="overrides", action="append", default=[], | |
| help="Repeatable. Parameter override in KEY=VALUE form. Overrides only those keys; all others are preserved.") | |
| parser.add_argument("--change-set-name", default=None, help="Optional change set name (default: Import-YYYYmmddHHMMSS).") | |
| parser.add_argument("--region", default=None, help="AWS region (falls back to env/defaults).") | |
| parser.add_argument("--role-arn", default=None, help="Assume this role before calling CloudFormation.") | |
| parser.add_argument("--execution-role-arn", default=None, | |
| help="Optional CloudFormation execution role (RoleARN) for the change set.") | |
| parser.add_argument("--execute", action="store_true", help="If set, execute the change set after creation.") | |
| parser.add_argument("--drift-check", action="store_true", help="If set, run stack drift detection after successful import.") | |
| args = parser.parse_args() | |
| # --- Resolve session/clients --- | |
| session = assume_role_session(args.role_arn, args.region) if args.role_arn else boto3.Session(region_name=args.region) | |
| cf = session.client("cloudformation") | |
| # --- Validate inputs & derive structures --- | |
| if not args.imports: | |
| raise ValueError("At least one --import is required.") | |
| resources_to_import = [parse_inline_import(s) for s in args.imports] | |
| overrides = parse_overrides(args.overrides) | |
| # Fetch existing stack parameters to preserve | |
| stack = get_stack_details(cf, args.stack_name) | |
| stack_params = stack.get("Parameters", []) | |
| params = build_parameters_for_reuse(stack_params, overrides) | |
| # Change set name | |
| cs_name = args.change_set_name or f"Import-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}" | |
| # --- Create change set (IMPORT) --- | |
| create_kwargs: Dict[str, Any] = { | |
| "StackName": args.stack_name, | |
| "TemplateURL": args.template_url, | |
| "Parameters": params, | |
| "ChangeSetName": cs_name, | |
| "ChangeSetType": "IMPORT", | |
| "ResourcesToImport": resources_to_import, | |
| } | |
| # Important: RoleARN is the correct param name for CFN to assume during the operation | |
| if args.execution_role_arn: | |
| create_kwargs["RoleARN"] = args.execution_role_arn | |
| # Optional: If your template requires these, uncomment as needed | |
| # create_kwargs["Capabilities"] = ["CAPABILITY_NAMED_IAM", "CAPABILITY_AUTO_EXPAND"] | |
| print("[info] Creating IMPORT change set...") | |
| cf.create_change_set(**create_kwargs) | |
| # --- Wait for change set creation --- | |
| cs = wait_for_changeset(cf, args.stack_name, cs_name) | |
| print_changeset_summary(cs) | |
| if cs.get("Status") == "FAILED": | |
| reason = cs.get("StatusReason", "") | |
| print(f"\n[error] Change set creation failed: {reason}") | |
| # If a failed change set is left around, clean it up to allow reuse of the name | |
| try: | |
| cf.delete_change_set(StackName=args.stack_name, ChangeSetName=cs["ChangeSetId"]) | |
| print("[info] Deleted failed change set.") | |
| except Exception as e: | |
| print(f"[warn] Could not delete failed change set: {e}") | |
| sys.exit(1) | |
| # --- Optionally execute --- | |
| if args.execute: | |
| print("\n[info] Executing change set...") | |
| cf.execute_change_set(StackName=args.stack_name, ChangeSetName=cs["ChangeSetId"]) | |
| # Wait for import to complete | |
| final_status = wait_for_stack_import(cf, args.stack_name) | |
| print(f"[ok] Stack import finished with status: {final_status}") | |
| # Optional drift check | |
| if args.drift_check: | |
| print("[info] Starting drift detection...") | |
| dd = cf.detect_stack_drift(StackName=args.stack_name) | |
| drift_id = dd["StackDriftDetectionId"] | |
| # Poll drift detection status | |
| while True: | |
| resp = cf.describe_stack_drift_detection_status(StackDriftDetectionId=drift_id) | |
| dd_status = resp["DetectionStatus"] | |
| print(f"[wait] Drift detection: {dd_status}") | |
| if dd_status in ("DETECTION_COMPLETE", "DETECTION_FAILED"): | |
| print(f"[info] Drift overall status: {resp['StackDriftStatus']}") | |
| if dd_status == "DETECTION_FAILED": | |
| print(f"[warn] Drift detection failed: {resp.get('DetectionStatusReason','')}") | |
| break | |
| time.sleep(5) | |
| print("\n[done] Completed.") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n[interrupt] Exiting on Ctrl+C") | |
| sys.exit(130) | |
| except Exception as e: | |
| print(f"[fatal] {e}") | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.