Last active
October 16, 2025 14:19
-
-
Save filipeandre/8c08e4bc6faff62cedf1a0217705a584 to your computer and use it in GitHub Desktop.
S3 encryption migration tool (SSE-S3 -> SSE-KMS or ensure SSE-KMS)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| S3 encryption migration tool (SSE-S3 -> SSE-KMS or ensure SSE-KMS). | |
| - Generates manifest of current objects with SSE/KMS info + drift status | |
| - Ensures bucket default encryption (optional) | |
| - Re-encrypts objects in place to a target KMS key (idempotent, resumable) | |
| Usage: | |
| # Build manifest only (drift vs bucket default or explicit KMS key) | |
| python s3_reencrypt.py manifest --bucket playground-bucket --region us-east-1 --out data/s3_manifest.csv | |
| python s3_reencrypt.py manifest --bucket playground-bucket --region us-east-1 --out data/s3_manifest.csv \ | |
| --kms-key-arn arn:aws:kms:us-east-1:123456789012:key/abcd-...-efgh --head-workers 48 --head-max-rps 250 | |
| # Ensure default encryption = SSE-KMS with a specific key (or AES256 if omitted) | |
| python s3_reencrypt.py ensure-bucket-encryption --bucket playground-bucket --region us-east-1 \ | |
| --kms-key-arn arn:aws:kms:us-east-1:123456789012:key/abcd-...-efgh | |
| # Migrate encryption in-place using the manifest (resumable; uses manifest Target_* and DriftStatus) | |
| python s3_reencrypt.py migrate --manifest data/s3_manifest.csv --region us-east-1 --workers 32 --max-retries 7 \ | |
| --max-rps 150 --verify-sample 0.02 | |
| """ | |
| import time | |
| import re | |
| import argparse | |
| import csv | |
| import os | |
| import sys | |
| import signal | |
| import random | |
| import threading | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, Iterable, Optional, Tuple, List | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import boto3 | |
| from botocore.config import Config | |
| from botocore.exceptions import ClientError | |
| # --------------------------- | |
| # Utilities | |
| # --------------------------- | |
| ISO8601 = "%Y-%m-%dT%H:%M:%S%z" | |
| def utcnow() -> str: | |
| return datetime.now(timezone.utc).strftime(ISO8601) | |
| def ensure_dir(path: str): | |
| d = os.path.dirname(os.path.abspath(path)) | |
| if d: | |
| os.makedirs(d, exist_ok=True) | |
| def safe_int(v, default=0): | |
| try: | |
| return int(v) | |
| except Exception: | |
| return default | |
| def log(msg: str): | |
| print(f"[{utcnow()}] {msg}", flush=True) | |
| def backoff_sleep(attempt: int) -> float: | |
| # Exponential backoff w/ full jitter | |
| base = min(2 ** attempt, 64) # cap growth | |
| return random.uniform(0.5, base) | |
| # --------------------------- | |
| # Simple rate limiter (token bucket) | |
| # --------------------------- | |
| class RateLimiter: | |
| def __init__(self, rate_per_sec: float): | |
| self.rate = max(0.0, float(rate_per_sec or 0.0)) | |
| self.tokens = self.rate | |
| self.last = time.monotonic() | |
| self.lock = threading.Lock() | |
| def acquire(self): | |
| if self.rate <= 0.0: | |
| return | |
| with self.lock: | |
| now = time.monotonic() | |
| elapsed = now - self.last | |
| self.tokens = min(self.rate, self.tokens + elapsed * self.rate) | |
| self.last = now | |
| if self.tokens < 1.0: | |
| need = 1.0 - self.tokens | |
| time.sleep(need / self.rate) | |
| self.tokens = 0.0 | |
| else: | |
| self.tokens -= 1.0 | |
| # --------------------------- | |
| # S3 helpers | |
| # --------------------------- | |
| def s3_client(region: Optional[str], max_pool_connections: int = 200) -> Any: | |
| session = boto3.session.Session(region_name=region) | |
| cfg = Config( | |
| retries={"mode": "adaptive", "max_attempts": 10}, | |
| connect_timeout=10, | |
| read_timeout=120, | |
| max_pool_connections=max_pool_connections, | |
| tcp_keepalive=True, | |
| ) | |
| return session.client("s3", config=cfg) | |
| def bucket_exists(s3, bucket: str) -> bool: | |
| try: | |
| s3.head_bucket(Bucket=bucket) | |
| return True | |
| except ClientError as e: | |
| code = int(e.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0)) | |
| if code == 403: | |
| # Exists but no head access | |
| return True | |
| return False | |
| def create_bucket_if_needed(s3, bucket: str, region: Optional[str]): | |
| if bucket_exists(s3, bucket): | |
| log(f"Bucket exists: s3://{bucket}") | |
| return | |
| if region in (None, "", "us-east-1"): | |
| s3.create_bucket(Bucket=bucket) | |
| else: | |
| s3.create_bucket( | |
| Bucket=bucket, | |
| CreateBucketConfiguration={"LocationConstraint": region}, | |
| ) | |
| log(f"Created bucket: s3://{bucket} (region={region or 'us-east-1'})") | |
| def set_default_encryption_sse_s3(s3, bucket: str): | |
| s3.put_bucket_encryption( | |
| Bucket=bucket, | |
| ServerSideEncryptionConfiguration={ | |
| "Rules": [ | |
| { | |
| "ApplyServerSideEncryptionByDefault": { | |
| "SSEAlgorithm": "AES256" | |
| }, | |
| "BucketKeyEnabled": False, | |
| } | |
| ] | |
| }, | |
| ) | |
| def set_default_encryption_kms(s3, bucket: str, kms_arn: str, bucket_key_enabled: bool = True): | |
| s3.put_bucket_encryption( | |
| Bucket=bucket, | |
| ServerSideEncryptionConfiguration={ | |
| "Rules": [ | |
| { | |
| "ApplyServerSideEncryptionByDefault": { | |
| "SSEAlgorithm": "aws:kms", | |
| "KMSMasterKeyID": kms_arn, | |
| }, | |
| "BucketKeyEnabled": bucket_key_enabled, | |
| } | |
| ] | |
| }, | |
| ) | |
| def get_bucket_default_encryption(s3, bucket: str) -> Dict: | |
| try: | |
| resp = s3.get_bucket_encryption(Bucket=bucket) | |
| cfg = resp.get("ServerSideEncryptionConfiguration", {}) | |
| rules = cfg.get("Rules", []) | |
| if rules: | |
| return rules[0].get("ApplyServerSideEncryptionByDefault", {}) | |
| return {} | |
| except ClientError as e: | |
| if e.response.get("Error", {}).get("Code") == "ServerSideEncryptionConfigurationNotFoundError": | |
| return {} | |
| raise | |
| def list_objects_stream(s3, bucket: str, prefix: str = "") -> Iterable[Dict]: | |
| paginator = s3.get_paginator("list_objects_v2") | |
| kwargs = {"Bucket": bucket} | |
| if prefix: | |
| kwargs["Prefix"] = prefix | |
| # Use larger page size for efficiency on big buckets | |
| for page in paginator.paginate(**kwargs, PaginationConfig={"PageSize": 1000}): | |
| for obj in page.get("Contents", []) or []: | |
| yield obj | |
| def head_object_encryption(s3, bucket: str, key: str) -> Dict: | |
| """Single-attempt HEAD wrapper (kept for compatibility). Prefer head_object_encryption_retry in hot paths.""" | |
| try: | |
| resp = s3.head_object(Bucket=bucket, Key=key) | |
| return { | |
| "ServerSideEncryption": resp.get("ServerSideEncryption", ""), # 'AES256' or 'aws:kms' | |
| "SSEKMSKeyId": resp.get("SSEKMSKeyId", ""), | |
| "BucketKeyEnabled": resp.get("BucketKeyEnabled", ""), | |
| "ETag": resp.get("ETag"), | |
| "ContentLength": resp.get("ContentLength"), | |
| "LastModified": resp.get("LastModified").strftime(ISO8601) if resp.get("LastModified") else "", | |
| } | |
| except ClientError as e: | |
| return { | |
| "ServerSideEncryption": "", | |
| "SSEKMSKeyId": "", | |
| "BucketKeyEnabled": "", | |
| "ETag": "", | |
| "ContentLength": "", | |
| "LastModified": "", | |
| "Error": f"{e.response.get('Error', {}).get('Code')}: {e.response.get('Error', {}).get('Message')}", | |
| } | |
| # ---- Retriable HEAD (Tier-1 improvement #1) | |
| RETRIABLE_S3_ERRORS = { | |
| "SlowDown","Throttling","ThrottlingException","RequestTimeout", | |
| "InternalError","ServiceUnavailable","ExpiredToken","ExpiredTokenException", | |
| } | |
| def head_object_encryption_retry(s3, bucket: str, key: str, max_retries: int = 5) -> Dict: | |
| """HEAD with jittered retries on transient errors; returns same dict shape as head_object_encryption.""" | |
| attempt = 0 | |
| while True: | |
| try: | |
| resp = s3.head_object(Bucket=bucket, Key=key) | |
| return { | |
| "ServerSideEncryption": resp.get("ServerSideEncryption", ""), | |
| "SSEKMSKeyId": resp.get("SSEKMSKeyId", ""), | |
| "BucketKeyEnabled": resp.get("BucketKeyEnabled", ""), | |
| "ETag": resp.get("ETag"), | |
| "ContentLength": resp.get("ContentLength"), | |
| "LastModified": resp.get("LastModified").strftime(ISO8601) if resp.get("LastModified") else "", | |
| } | |
| except ClientError as e: | |
| code = e.response.get("Error", {}).get("Code", "") | |
| attempt += 1 | |
| if code not in RETRIABLE_S3_ERRORS or attempt >= max_retries: | |
| return { | |
| "ServerSideEncryption": "", | |
| "SSEKMSKeyId": "", | |
| "BucketKeyEnabled": "", | |
| "ETag": "", | |
| "ContentLength": "", | |
| "LastModified": "", | |
| "Error": f"{code}: {e.response.get('Error', {}).get('Message')}", | |
| } | |
| time.sleep(backoff_sleep(attempt)) | |
| except Exception as e: | |
| # Non-ClientError, return error payload | |
| return { | |
| "ServerSideEncryption": "", | |
| "SSEKMSKeyId": "", | |
| "BucketKeyEnabled": "", | |
| "ETag": "", | |
| "ContentLength": "", | |
| "LastModified": "", | |
| "Error": f"Exception: {e}", | |
| } | |
| def _kms_key_id(s: str) -> str: | |
| """Normalize KMS identifier to key-id so ARN vs key-id compares equal.""" | |
| if not s: | |
| return "" | |
| m = re.search(r"(?:^|/)key/([0-9a-fA-F-]{8,})$", s) | |
| return m.group(1) if m else s | |
| def read_bucket_target_policy(s3, bucket: str) -> Dict[str, str]: | |
| """ | |
| Return desired per-object encryption policy derived from bucket default: | |
| {"algo": "AES256"} or {"algo":"aws:kms","kms_key_id":"<key-id>"} | |
| If no bucket default is set, fall back to AES256. | |
| """ | |
| cfg = get_bucket_default_encryption(s3, bucket) or {} | |
| algo = cfg.get("SSEAlgorithm", "") | |
| if algo == "aws:kms": | |
| return {"algo": "aws:kms", "kms_key_id": _kms_key_id(cfg.get("KMSMasterKeyID", ""))} | |
| # treat missing/unknown as AES256 target | |
| return {"algo": "AES256"} | |
| def _target_from_args_or_bucket(s3, bucket: str, kms_key_arn: Optional[str]) -> Dict[str, str]: | |
| """Pick a stable target policy snapshot for the manifest.""" | |
| if kms_key_arn: | |
| return {"algo": "aws:kms", "kms_key_id": _kms_key_id(kms_key_arn)} | |
| return read_bucket_target_policy(s3, bucket) | |
| def needs_migration(row: Dict, target_policy: Dict[str, str]) -> bool: | |
| algo = target_policy.get("algo") | |
| if algo == "AES256": | |
| # object must be AES256 (explicit); anything else gets migrated | |
| return row.get("ServerSideEncryption") != "AES256" | |
| if algo == "aws:kms": | |
| return not ( | |
| row.get("ServerSideEncryption") == "aws:kms" | |
| and _kms_key_id(row.get("SSEKMSKeyId", "")) == target_policy.get("kms_key_id", "") | |
| ) | |
| # default: migrate | |
| return True | |
| def copy_in_place_to_policy( | |
| s3, | |
| bucket: str, | |
| key: str, | |
| target_policy: Dict[str, str], | |
| max_retries: int = 3, | |
| limiter: Optional[RateLimiter] = None, | |
| ) -> None: | |
| """Re-encrypts an existing S3 object in place according to target_policy. | |
| If the file is archived or otherwise invalid, abort without retrying. | |
| """ | |
| attempt = 0 | |
| while True: | |
| try: | |
| if limiter is not None: | |
| limiter.acquire() | |
| kwargs = { | |
| "Bucket": bucket, | |
| "Key": key, | |
| "CopySource": {"Bucket": bucket, "Key": key}, | |
| "MetadataDirective": "COPY", | |
| "TaggingDirective": "COPY", | |
| } | |
| if target_policy.get("algo") == "aws:kms": | |
| kwargs.update({ | |
| "ServerSideEncryption": "aws:kms", | |
| "SSEKMSKeyId": target_policy["kms_key_id"], | |
| "BucketKeyEnabled": True, # enable S3 Bucket Keys on the copy for cost savings | |
| }) | |
| else: | |
| kwargs.update({"ServerSideEncryption": "AES256"}) | |
| s3.copy_object(**kwargs) | |
| return | |
| except ClientError as e: | |
| code = e.response.get("Error", {}).get("Code", "") | |
| attempt += 1 | |
| # Skip immediately if key is invalid or inaccessible for KMS | |
| if code == "KMS.ValidationException": | |
| log(f"KMS key validation failed for s3://{bucket}/{key} — skipping object.") | |
| e.operation = "skip_object" | |
| raise e | |
| # Abort immediately if object is archived or not retrievable | |
| if code in ("InvalidObjectState", "ObjectNotInActiveTierError"): | |
| log(f"File s3://{bucket}/{key} is archived or not accessible ({code}), skipping.") | |
| e.operation = "invalid_file_status" | |
| raise e | |
| if attempt >= max_retries: | |
| raise | |
| sleep_s = backoff_sleep(attempt) | |
| log( | |
| f"Retry {attempt}/{max_retries} for s3://{bucket}/{key}: " | |
| f"{code or 'UnknownError'} — sleeping {sleep_s:.1f}s" | |
| ) | |
| time.sleep(sleep_s) | |
| # ---- Planner helpers ---- | |
| def aes256_target() -> Dict[str, str]: | |
| """Return a manifest target that forces S3-managed encryption (AES256).""" | |
| return {"algo": "AES256"} | |
| def kms_target(kms_key_arn_or_id: str) -> Dict[str, str]: | |
| """Return a manifest target for SSE-KMS with normalized key id.""" | |
| return {"algo": "aws:kms", "kms_key_id": _kms_key_id(kms_key_arn_or_id)} | |
| def needs_migration_from_head(head: Dict, target_policy: Dict[str, str]) -> bool: | |
| """Adapter: compute drift using a head_object dict.""" | |
| return needs_migration( | |
| {"ServerSideEncryption": head.get("ServerSideEncryption", ""), | |
| "SSEKMSKeyId": head.get("SSEKMSKeyId", "")}, | |
| target_policy | |
| ) | |
| def build_manifest_row(bucket: str, obj: Dict, head: Dict, target: Dict[str, str], | |
| preserve_from: Optional[Dict] = None) -> Dict: | |
| """ | |
| Build a single manifest row using head() + target. | |
| If preserve_from has final states (MIGRATED/SKIPPED), keep them. | |
| """ | |
| if head.get("Error"): | |
| drift = "UNKNOWN" | |
| else: | |
| drift = "DRIFTED" if needs_migration_from_head(head, target) else "IN_SYNC" | |
| row = { | |
| "Bucket": bucket, | |
| "Key": obj["Key"], | |
| "Size": obj.get("Size"), | |
| "ETag": obj.get("ETag", ""), | |
| "LastModified": obj.get("LastModified").strftime(ISO8601) if obj.get("LastModified") else "", | |
| "StorageClass": obj.get("StorageClass", ""), | |
| "Pre_ServerSideEncryption": head.get("ServerSideEncryption", ""), | |
| "Pre_SSEKMSKeyId": head.get("SSEKMSKeyId", ""), | |
| "Pre_BucketKeyEnabled": head.get("BucketKeyEnabled", ""), | |
| "Target_Algo": target.get("algo", ""), | |
| "Target_KMSKeyId": target.get("kms_key_id", ""), | |
| "DriftStatus": drift, | |
| "Status": "PENDING", | |
| "Attempts": 0, | |
| "LastError": "", | |
| "UpdatedAt": utcnow(), | |
| } | |
| if preserve_from and preserve_from.get("Status") in ("MIGRATED", "SKIPPED"): | |
| row["Status"] = preserve_from["Status"] | |
| row["Attempts"] = safe_int(preserve_from.get("Attempts", 0)) | |
| row["LastError"] = preserve_from.get("LastError", "") | |
| return row | |
| def list_buckets_in_region(s3, region: str) -> List[str]: | |
| """List bucket names in the given region (us-east-1 represented as None by S3).""" | |
| resp = s3.list_buckets() | |
| names = [b["Name"] for b in resp.get("Buckets", [])] | |
| out: List[str] = [] | |
| for b in names: | |
| try: | |
| loc = s3.get_bucket_location(Bucket=b).get("LocationConstraint") | |
| if (loc or "us-east-1") == region: | |
| out.append(b) | |
| except Exception: | |
| continue | |
| return out | |
| def bucket_default_is_sse_s3(s3, bucket: str) -> Optional[bool]: | |
| """ | |
| True -> default is explicitly SSE-S3 (AES256) | |
| False -> default is something else (none/KMS/unknown) | |
| None -> couldn't determine (permissions/other error) | |
| """ | |
| try: | |
| resp = s3.get_bucket_encryption(Bucket=bucket) | |
| cfg = resp.get("ServerSideEncryptionConfiguration", {}) or {} | |
| for rule in cfg.get("Rules", []): | |
| by_default = rule.get("ApplyServerSideEncryptionByDefault", {}) or {} | |
| algo = by_default.get("SSEAlgorithm") | |
| kms = by_default.get("KMSMasterKeyID") or by_default.get("KMSMasterKeyId") | |
| return (algo == "AES256") and (not kms) | |
| return False | |
| except ClientError as e: | |
| code = (e.response or {}).get("Error", {}).get("Code", "Unknown") | |
| if code in ("ServerSideEncryptionConfigurationNotFoundError", "NoSuchEncryptionConfiguration"): | |
| return False | |
| return None | |
| except Exception: | |
| return None | |
| def kms_is_usable(region: str, kms_key_arn_or_id: str) -> bool: | |
| """ | |
| Preflight: key exists, is enabled, and caller can Encrypt with it. | |
| Returns False on any KMS/permission error. | |
| """ | |
| if not kms_key_arn_or_id: | |
| return False | |
| kms = boto3.client("kms", region_name=region) | |
| try: | |
| desc = kms.describe_key(KeyId=kms_key_arn_or_id) | |
| meta = desc.get("KeyMetadata", {}) or {} | |
| if meta.get("KeyState") != "Enabled": | |
| return False | |
| kms.encrypt(KeyId=kms_key_arn_or_id, Plaintext=b"\x00", EncryptionAlgorithm="SYMMETRIC_DEFAULT") | |
| return True | |
| except ClientError: | |
| return False | |
| except Exception: | |
| return False | |
| # ---- End helpers ---- | |
| # --------------------------- | |
| # Manifest management | |
| # --------------------------- | |
| MANIFEST_FIELDS = [ | |
| "Bucket", "Key", "Size", "ETag", "LastModified", "StorageClass", | |
| "Pre_ServerSideEncryption", "Pre_SSEKMSKeyId", "Pre_BucketKeyEnabled", | |
| "Target_Algo", "Target_KMSKeyId", "DriftStatus", | |
| "Status", "Attempts", "LastError", "UpdatedAt" | |
| ] | |
| def write_manifest_header(path: str): | |
| ensure_dir(path) | |
| if not os.path.exists(path): | |
| with open(path, "w", newline="", encoding="utf-8") as f: | |
| csv.DictWriter(f, fieldnames=MANIFEST_FIELDS).writeheader() | |
| def append_manifest_rows(path: str, rows: List[Dict]): | |
| ensure_dir(path) | |
| with open(path, "a", newline="", encoding="utf-8") as f: | |
| w = csv.DictWriter(f, fieldnames=MANIFEST_FIELDS) | |
| for r in rows: | |
| w.writerow(r) | |
| def load_manifest(path: str) -> List[Dict]: | |
| with open(path, newline="", encoding="utf-8") as f: | |
| rdr = csv.DictReader(f) | |
| return list(rdr) | |
| def atomically_update_manifest(path: str, rows: List[Dict]): | |
| tmp = f"{path}.tmp" | |
| with open(tmp, "w", newline="", encoding="utf-8") as f: | |
| w = csv.DictWriter(f, fieldnames=MANIFEST_FIELDS) | |
| w.writeheader() | |
| for r in rows: | |
| w.writerow(r) | |
| os.replace(tmp, path) | |
| def iter_manifest_pending(path: str) -> Iterable[Dict]: | |
| with open(path, newline="", encoding="utf-8") as f: | |
| rdr = csv.DictReader(f) | |
| for row in rdr: | |
| if row.get("Status") in ("", "PENDING", "ERROR"): | |
| yield row | |
| # --------------------------- | |
| # Commands | |
| # --------------------------- | |
| def cmd_manifest(args): | |
| """ | |
| Build/refresh a drift-aware manifest for a bucket. | |
| Improvements: | |
| - parallel HEADs (non-resume and resume/upsert) with optional RPS limiter | |
| - retrying HEADs on transient errors | |
| - resume/upsert (--resume) to avoid duplicates and let you pick up after cancel | |
| - optional refresh of existing rows' Pre_* and DriftStatus (--refresh-existing) | |
| - per-page checkpoint (continuation token) for resumable listings | |
| - unified heartbeat logging | |
| - graceful Ctrl+C with checkpoint + early stop | |
| """ | |
| import json | |
| # ---- helpers ---- | |
| def _load_manifest_index(path: str) -> Dict[str, Dict]: | |
| if not os.path.exists(path): | |
| return {} | |
| rows = load_manifest(path) | |
| return {r["Key"]: r for r in rows if r.get("Key")} | |
| def _save_ckpt(ckpt_path: str, page_no: int, token: Optional[str]): | |
| with open(ckpt_path, "w") as f: | |
| json.dump({"page_no": page_no, "next_token": token}, f) | |
| # ---- setup ---- | |
| resume = getattr(args, "resume", False) | |
| refresh_existing = getattr(args, "refresh_existing", False) | |
| log_every = safe_int(getattr(args, "log_every", 500), 500) | |
| progress_every = safe_int(getattr(args, "progress_every_rows", 20000), 20000) | |
| list_max_seconds = safe_int(getattr(args, "list_max_seconds", 0), 0) | |
| # Graceful Ctrl+C for manifest (uses _handle_sigint defined below) | |
| signal.signal(signal.SIGINT, _handle_sigint) | |
| # pool sizing for S3 HTTP | |
| desired_pool = 200 | |
| if getattr(args, "head_workers", 0): | |
| desired_pool = max(200, int(args.head_workers) * 4) | |
| s3 = s3_client(args.region, max_pool_connections=desired_pool) | |
| if not bucket_exists(s3, args.bucket): | |
| if getattr(args, "create_bucket", False): | |
| create_bucket_if_needed(s3, args.bucket, args.region) | |
| set_default_encryption_sse_s3(s3, args.bucket) | |
| log("Enabled default bucket encryption: SSE-S3 (AES256)") | |
| else: | |
| log(f"Bucket does not exist: s3://{args.bucket}") | |
| sys.exit(2) | |
| write_manifest_header(args.out) | |
| target = _target_from_args_or_bucket(s3, args.bucket, getattr(args, "kms_key_arn", None)) | |
| log( | |
| f"Manifest target policy: {target['algo']}" | |
| + (f" key={target.get('kms_key_id')}" if target.get('kms_key_id') else "") | |
| ) | |
| # ---- non-resume (fresh) ---- | |
| if not resume: | |
| objs = list(list_objects_stream(s3, args.bucket, getattr(args, "prefix", "") or "")) | |
| pool_size = int(getattr(args, "head_workers", 0)) or min(64, max(8, int(desired_pool * 0.8))) | |
| head_max_rps = float(getattr(args, "head_max_rps", 0.0) or 0.0) | |
| limiter = RateLimiter(head_max_rps) if head_max_rps > 0.0 else None | |
| log( | |
| f"Listing yielded {len(objs)} objects; HEAD concurrency={pool_size}" | |
| + (f"; head-max-rps={int(head_max_rps)}" if head_max_rps > 0 else "") | |
| ) | |
| objs_by_key = {o["Key"]: o for o in objs} | |
| def _head(k: str) -> Tuple[str, Dict]: | |
| if limiter: | |
| limiter.acquire() | |
| return k, head_object_encryption_retry(s3, args.bucket, k) | |
| count, batch = 0, [] | |
| with ThreadPoolExecutor(max_workers=pool_size) as ex: | |
| futures = [ex.submit(_head, k) for k in objs_by_key.keys()] | |
| for fut in as_completed(futures): | |
| k, head = fut.result() | |
| obj = objs_by_key[k] | |
| row = build_manifest_row(args.bucket, obj, head, target) | |
| batch.append(row) | |
| count += 1 | |
| if len(batch) >= progress_every: | |
| append_manifest_rows(args.out, batch) | |
| batch.clear() | |
| log(f"Manifested {count} objects...") | |
| if batch: | |
| append_manifest_rows(args.out, batch) | |
| log(f"Manifest written: {args.out} (total {count} keys)") | |
| return | |
| # ---- resume/upsert (parallel per page) ---- | |
| index = _load_manifest_index(args.out) | |
| processed, added, refreshed, dirty_since_save = 0, 0, 0, 0 | |
| start_ts = time.time() | |
| ckpt_path = f"{args.out}.ckpt" | |
| ckpt = None | |
| if os.path.exists(ckpt_path): | |
| try: | |
| with open(ckpt_path) as f: | |
| ckpt = json.load(f) | |
| log(f"Resuming listing from checkpoint: page={ckpt.get('page_no')} next={ckpt.get('next_token')}") | |
| except Exception: | |
| ckpt = None | |
| paginator = s3.get_paginator("list_objects_v2") | |
| kwargs = {"Bucket": args.bucket} | |
| prefix = getattr(args, "prefix", "") or "" | |
| if prefix: | |
| kwargs["Prefix"] = prefix | |
| if ckpt and ckpt.get("next_token"): | |
| kwargs["ContinuationToken"] = ckpt["next_token"] | |
| # Concurrency knobs for HEADs | |
| pool_size = int(getattr(args, "head_workers", 0)) or 64 | |
| head_max_rps = float(getattr(args, "head_max_rps", 0.0) or 0.0) | |
| limiter = RateLimiter(head_max_rps) if head_max_rps > 0.0 else None | |
| def _head(k: str) -> Tuple[str, Dict]: | |
| if limiter: | |
| limiter.acquire() | |
| return k, head_object_encryption_retry(s3, args.bucket, k) | |
| page_no = ckpt.get("page_no", 0) if ckpt else 0 | |
| for page in paginator.paginate(**kwargs, PaginationConfig={"PageSize": 1000}): | |
| page_no += 1 | |
| objs = page.get("Contents", []) or [] | |
| dur = time.time() - start_ts | |
| log(f"Listing page {page_no} (n={len(objs)}) in {dur:.2f}s") | |
| start_ts = time.time() | |
| if not objs: | |
| _save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken")) | |
| if not page.get("IsTruncated"): | |
| break | |
| continue | |
| objs_by_key = {o["Key"]: o for o in objs} | |
| page_added = 0 | |
| page_refreshed = 0 | |
| # Early stop before scheduling any work | |
| if _stop_requested: | |
| log("Stop flag observed before scheduling page work; saving checkpoint and exiting.") | |
| _save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken")) | |
| break | |
| with ThreadPoolExecutor(max_workers=pool_size) as ex: | |
| futures = [ex.submit(_head, k) for k in objs_by_key.keys()] | |
| try: | |
| for fut in as_completed(futures): | |
| if _stop_requested: | |
| for f in futures: | |
| f.cancel() | |
| # Python 3.9+: fast cancel of queued tasks | |
| ex.shutdown(wait=False, cancel_futures=True) | |
| atomically_update_manifest(args.out, list(index.values())) | |
| log("Progress saved after stop. Exiting page early.") | |
| break | |
| k, head = fut.result() | |
| obj = objs_by_key[k] | |
| prev = index.get(k) | |
| if prev and not refresh_existing: | |
| processed += 1 | |
| if processed % log_every == 0: | |
| log(f"Upsert progress: processed={processed}, total_rows={len(index)}") | |
| continue | |
| drift = "UNKNOWN" if head.get("Error") else ( | |
| "DRIFTED" if needs_migration_from_head(head, target) else "IN_SYNC" | |
| ) | |
| row = prev.copy() if prev else { | |
| "Bucket": args.bucket, | |
| "Key": k, | |
| "Status": "PENDING", | |
| "Attempts": 0, | |
| "LastError": "" | |
| } | |
| row.update({ | |
| "Size": obj.get("Size"), | |
| "ETag": obj.get("ETag", ""), | |
| "LastModified": obj.get("LastModified").strftime(ISO8601) if obj.get("LastModified") else "", | |
| "StorageClass": obj.get("StorageClass", ""), | |
| "Pre_ServerSideEncryption": head.get("ServerSideEncryption", ""), | |
| "Pre_SSEKMSKeyId": head.get("SSEKMSKeyId", ""), | |
| "Pre_BucketKeyEnabled": head.get("BucketKeyEnabled", ""), | |
| "Target_Algo": target.get("algo", ""), | |
| "Target_KMSKeyId": target.get("kms_key_id", ""), | |
| "DriftStatus": drift, | |
| "UpdatedAt": utcnow(), | |
| }) | |
| index[k] = row | |
| processed += 1 | |
| if prev: | |
| page_refreshed += 1 | |
| else: | |
| page_added += 1 | |
| dirty_since_save += 1 | |
| if processed % log_every == 0: | |
| log( | |
| f"Upsert progress: processed={processed}, total_rows={len(index)} " | |
| f"(added={added + page_added}, refreshed={refreshed + page_refreshed})" | |
| ) | |
| if dirty_since_save >= progress_every: | |
| atomically_update_manifest(args.out, list(index.values())) | |
| log( | |
| f"Checkpoint save: processed={processed}, added={added + page_added}, " | |
| f"refreshed={refreshed + page_refreshed}, total_rows={len(index)}" | |
| ) | |
| dirty_since_save = 0 | |
| # Optional per-page time budget | |
| if list_max_seconds and (time.time() - start_ts) > list_max_seconds: | |
| log(f"Reached time budget (--list-max-seconds={list_max_seconds}). Stopping early.") | |
| for f in futures: | |
| f.cancel() | |
| ex.shutdown(wait=False, cancel_futures=True) | |
| break | |
| finally: | |
| if _stop_requested: | |
| _save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken")) | |
| break | |
| added += page_added | |
| refreshed += page_refreshed | |
| # Save page checkpoint | |
| _save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken")) | |
| # Stop before fetching next page if requested | |
| if _stop_requested: | |
| log("Stop flag observed after page completion; exiting before next page.") | |
| break | |
| if not page.get("IsTruncated"): | |
| break | |
| # Final write | |
| atomically_update_manifest(args.out, list(index.values())) | |
| log( | |
| f"Upsert complete: processed={processed}, added={added}, refreshed={refreshed}, " | |
| f"total_rows={len(index)} (file: {args.out})" | |
| ) | |
| def cmd_ensure_bucket_encryption(args): | |
| s3 = s3_client(args.region) | |
| if not bucket_exists(s3, args.bucket): | |
| log(f"Bucket does not exist: s3://{args.bucket}") | |
| sys.exit(2) | |
| current = get_bucket_default_encryption(s3, args.bucket) | |
| algo = current.get("SSEAlgorithm", "None") | |
| kms = _kms_key_id(current.get("KMSMasterKeyID", "")) # normalize | |
| if args.kms_key_arn: | |
| if algo == "aws:kms" and kms == _kms_key_id(args.kms_key_arn): | |
| log(f"Bucket already uses SSE-KMS with provided key:\n {args.kms_key_arn}") | |
| else: | |
| set_default_encryption_kms(s3, args.bucket, args.kms_key_arn, bucket_key_enabled=not args.no_bucket_key) | |
| log( | |
| f"Updated bucket default encryption to SSE-KMS:\n {args.kms_key_arn} " | |
| f"(BucketKeyEnabled={not args.no_bucket_key})" | |
| ) | |
| else: | |
| if algo == "AES256": | |
| log("Bucket already uses SSE-S3 (AES256)") | |
| else: | |
| set_default_encryption_sse_s3(s3, args.bucket) | |
| log("Updated bucket default encryption to SSE-S3 (AES256)") | |
| def _migrate_one(s3, row: Dict, target_policy: Dict[str, str], max_retries: int, | |
| verify_mode: Dict[str, float], limiter: Optional[RateLimiter]) -> Tuple[str, Dict]: | |
| bucket = row["Bucket"]; key = row["Key"] | |
| def _ok(v: Dict) -> bool: | |
| if target_policy.get("algo") == "AES256": | |
| return v.get("ServerSideEncryption") == "AES256" | |
| return ( | |
| v.get("ServerSideEncryption") == "aws:kms" | |
| and _kms_key_id(v.get("SSEKMSKeyId", "")) == target_policy.get("kms_key_id", "") | |
| ) | |
| # Re-head (object might have changed since manifest) — with retries | |
| head = head_object_encryption_retry(s3, bucket, key) | |
| if not needs_migration( | |
| {"ServerSideEncryption": head.get("ServerSideEncryption", ""), | |
| "SSEKMSKeyId": head.get("SSEKMSKeyId", "")}, | |
| target_policy | |
| ): | |
| row["Status"] = "SKIPPED" | |
| row["LastError"] = "" | |
| row["UpdatedAt"] = utcnow() | |
| row["DriftStatus"] = "IN_SYNC" | |
| return "SKIPPED", row | |
| # Copy + (optional) verify; refresh client once on ExpiredToken | |
| try: | |
| copy_in_place_to_policy(s3, bucket, key, target_policy, max_retries=max_retries, limiter=limiter) | |
| do_verify = False | |
| if verify_mode.get("mode") == "all": | |
| do_verify = True | |
| elif verify_mode.get("mode") == "sample": | |
| do_verify = (random.random() < float(verify_mode.get("sample", 0.0))) | |
| if do_verify: | |
| v = head_object_encryption_retry(s3, bucket, key) | |
| row["Status"] = "MIGRATED" if _ok(v) else "ERROR" | |
| row["LastError"] = "" if row["Status"] == "MIGRATED" else "Post-copy verification failed" | |
| row["DriftStatus"] = "IN_SYNC" if row["Status"] == "MIGRATED" else "DRIFTED" | |
| else: | |
| row["Status"] = "MIGRATED" | |
| row["DriftStatus"] = "IN_SYNC" | |
| row["LastError"] = "" | |
| except ClientError as e: | |
| code = e.response.get("Error", {}).get("Code") | |
| if getattr(e, "operation", "") == "skip_object" or code == "KMS.ValidationException": | |
| row["Status"] = "SKIPPED" | |
| row["LastError"] = "KMS.ValidationException — skipping object." | |
| row["DriftStatus"] = "DRIFTED" | |
| return "SKIPPED", row | |
| if code in ("ExpiredToken", "ExpiredTokenException"): | |
| try: | |
| fresh = s3_client(getattr(s3.meta, "region_name", None), max_pool_connections=200) | |
| copy_in_place_to_policy(fresh, bucket, key, target_policy, max_retries=max_retries, limiter=limiter) | |
| if verify_mode.get("mode") == "all" or ( | |
| verify_mode.get("mode") == "sample" and random.random() < float(verify_mode.get("sample", 0.0)) | |
| ): | |
| v = head_object_encryption_retry(fresh, bucket, key) | |
| row["Status"] = "MIGRATED" if _ok(v) else "ERROR" | |
| row["LastError"] = "" if row["Status"] == "MIGRATED" else "Post-copy verification failed (after refresh)" | |
| row["DriftStatus"] = "IN_SYNC" if row["Status"] == "MIGRATED" else "DRIFTED" | |
| else: | |
| row["Status"] = "MIGRATED" | |
| row["DriftStatus"] = "IN_SYNC" | |
| row["LastError"] = "" | |
| except Exception as e2: | |
| row["Status"] = "ERROR"; row["LastError"] = str(e2) | |
| row["DriftStatus"] = "DRIFTED" | |
| else: | |
| row["Status"] = "ERROR"; row["LastError"] = str(e) | |
| row["DriftStatus"] = "DRIFTED" | |
| except Exception as e: | |
| row["Status"] = "ERROR"; row["LastError"] = str(e) | |
| row["DriftStatus"] = "DRIFTED" | |
| row["Attempts"] = safe_int(row.get("Attempts", 0)) + 1 | |
| row["UpdatedAt"] = utcnow() | |
| return row["Status"], row | |
| # Escalating SIGINT (triple Ctrl+C) | |
| _stop_requested = False | |
| _sigint_count = 0 | |
| def _handle_sigint(sig, frame): | |
| global _stop_requested, _sigint_count | |
| _sigint_count += 1 | |
| if _sigint_count == 1: | |
| _stop_requested = True | |
| log("Stop requested (Ctrl+C). Finishing in-flight tasks before writing progress... (press Ctrl+C again to force abort)") | |
| elif _sigint_count == 2: | |
| log("Force abort requested. Raising KeyboardInterrupt...") | |
| raise KeyboardInterrupt | |
| else: | |
| log("Hard abort. Exiting immediately.") | |
| os._exit(130) | |
| def cmd_migrate(args): | |
| signal.signal(signal.SIGINT, _handle_sigint) | |
| # Scale S3 HTTP pool with workers (Tier-1 improvement #3) | |
| s3 = s3_client(args.region, max_pool_connections=max(args.workers * 4, 200)) | |
| rows = load_manifest(args.manifest) | |
| if not rows: | |
| log("Manifest is empty; nothing to do.") | |
| return | |
| # Determine if manifest has target/drift columns | |
| manifest_has_target = "Target_Algo" in rows[0] | |
| manifest_has_drift = "DriftStatus" in rows[0] | |
| total = len(rows) | |
| log(f"Loaded manifest with {total} rows") | |
| # Worklist selection | |
| if args.only_pending: | |
| # truly ignore DriftStatus when requested | |
| work = [r for r in rows if r.get("Status") in ("", "PENDING")] | |
| else: | |
| if manifest_has_drift: | |
| # Process explicit drift + previous errors by default | |
| work = [r for r in rows if (r.get("DriftStatus") == "DRIFTED") or (r.get("Status") == "ERROR")] | |
| else: | |
| work = [r for r in rows if r.get("Status") in ("", "PENDING", "ERROR", "WOULD_MIGRATE")] | |
| if not work: | |
| log("No rows to process.") | |
| return | |
| log(f"Processing {len(work)} rows (workers={args.workers})") | |
| # In-memory index for quick updates | |
| by_key = {(r["Bucket"], r["Key"]): r for r in rows} | |
| completed = 0 | |
| migrated = 0 | |
| skipped = 0 | |
| errored = 0 | |
| def _save_progress(prefix: str = "Progress saved"): | |
| atomically_update_manifest(args.manifest, list(by_key.values())) | |
| log(f"{prefix}: {completed}/{len(work)} (migrated={migrated}, skipped={skipped}, errors={errored})") | |
| def _target_for_row(r: Dict) -> Dict[str, str]: | |
| if manifest_has_target: | |
| if r.get("Target_Algo") == "aws:kms": | |
| return {"algo": "aws:kms", "kms_key_id": _kms_key_id(r.get("Target_KMSKeyId", ""))} | |
| return {"algo": "AES256"} | |
| # Fallback to CLI arg or bucket default (legacy behavior) | |
| if getattr(args, "kms_key_arn", None): | |
| return {"algo": "aws:kms", "kms_key_id": _kms_key_id(args.kms_key_arn)} | |
| return read_bucket_target_policy(s3, r["Bucket"]) | |
| # Verify mode | |
| if getattr(args, "verify", False): | |
| verify_mode = {"mode": "all", "sample": 0.0} | |
| elif float(getattr(args, "verify_sample", 0.0) or 0.0) > 0.0: | |
| verify_mode = {"mode": "sample", "sample": float(args.verify_sample)} | |
| else: | |
| verify_mode = {"mode": "none", "sample": 0.0} | |
| limiter = RateLimiter(float(getattr(args, "max_rps", 0.0) or 0.0)) | |
| try: | |
| with ThreadPoolExecutor(max_workers=args.workers) as ex: | |
| futures = [] | |
| for r in work: | |
| if _stop_requested: | |
| break | |
| futures.append(ex.submit(_migrate_one, s3, dict(r), _target_for_row(r), | |
| args.max_retries, verify_mode, limiter)) | |
| for fut in as_completed(futures): | |
| try: | |
| status, updated_row = fut.result() | |
| except Exception as e: | |
| status, updated_row = "ERROR", { | |
| "Bucket": "?", "Key": "?", "Status": "ERROR", | |
| "LastError": str(e), "UpdatedAt": utcnow(), "DriftStatus": "DRIFTED" | |
| } | |
| # merge result | |
| bk = (updated_row["Bucket"], updated_row["Key"]) | |
| by_key[bk] = updated_row | |
| completed += 1 | |
| if status == "MIGRATED": | |
| migrated += 1 | |
| elif status == "SKIPPED": | |
| skipped += 1 | |
| else: | |
| errored += 1 | |
| if (completed % args.progress_every == 0) or _stop_requested: | |
| _save_progress("Progress saved") | |
| if _stop_requested: | |
| log("Stop flag observed; exiting after saving progress.") | |
| break | |
| except KeyboardInterrupt: | |
| _save_progress("Progress saved (interrupt)") | |
| raise | |
| finally: | |
| _save_progress("Final save") | |
| log( | |
| f"Done. Processed {completed}/{len(work)} " | |
| f"(migrated={migrated}, skipped={skipped}, errors={errored})." | |
| ) | |
| # --------------------------- | |
| # Main / CLI | |
| # --------------------------- | |
| def main(): | |
| p = argparse.ArgumentParser( | |
| description="S3 encryption migration (SSE-S3 -> SSE-KMS) with drift-aware manifest, resume & verification." | |
| ) | |
| sub = p.add_subparsers(dest="cmd", required=True) | |
| # manifest | |
| sp = sub.add_parser("manifest", help="Create a manifest CSV of current objects + SSE/KMS info, including drift.") | |
| sp.add_argument("--bucket", required=True) | |
| sp.add_argument("--region", default=None) | |
| sp.add_argument("--prefix", default="", help="Optional prefix to restrict scanning.") | |
| sp.add_argument("--out", required=True) | |
| sp.add_argument("--create-bucket", action="store_true", help="Create bucket if missing and set SSE-S3.") | |
| sp.add_argument("--kms-key-arn", default=None, help="If provided, compute drift against this KMS key; else use bucket default (or AES256 if none).") | |
| sp.add_argument("--resume", action="store_true", help="Merge into existing manifest instead of re-appending duplicates.") | |
| sp.add_argument("--refresh-existing", action="store_true", help="When used with --resume, refresh Pre_* and DriftStatus for keys already in the manifest.") | |
| # Tier-1 knobs (#2, #7) | |
| sp.add_argument("--head-max-rps", type=float, default=0.0, | |
| help="Max HEAD requests per second during manifest build (0 = unlimited).") | |
| sp.add_argument("--head-workers", type=int, default=0, | |
| help="Override parallel HEAD worker count during manifest (0 = auto).") | |
| sp.set_defaults(func=cmd_manifest) | |
| # ensure-bucket-encryption | |
| sp = sub.add_parser("ensure-bucket-encryption", help="Ensure bucket default encryption (SSE-S3 or SSE-KMS).") | |
| sp.add_argument("--bucket", required=True) | |
| sp.add_argument("--region", default=None) | |
| sp.add_argument("--kms-key-arn", default=None, help="If provided, set default to SSE-KMS with this key; otherwise use SSE-S3.") | |
| sp.add_argument("--no-bucket-key", action="store_true", help="Disable S3 Bucket Keys when using SSE-KMS.") | |
| sp.set_defaults(func=cmd_ensure_bucket_encryption) | |
| # migrate | |
| sp = sub.add_parser("migrate", help="Re-encrypt objects in place using the manifest’s target and drift snapshot (resumable).") | |
| sp.add_argument("--region", default=None) | |
| sp.add_argument("--manifest", required=True) | |
| sp.add_argument("--kms-key-arn", default=None) | |
| sp.add_argument("--workers", type=int, default=32) | |
| sp.add_argument("--max-retries", type=int, default=7) | |
| sp.add_argument("--progress-every", type=int, default=1000, help="Write manifest progress every N completions.") | |
| sp.add_argument("--only-pending", action="store_true", help="Process only rows with Status in '', PENDING (ignores DriftStatus entirely).") | |
| sp.add_argument("--verify", action="store_true", help="HEAD-verify after copy for every object (slower, safer).") | |
| sp.add_argument("--verify-sample", type=float, default=0.0, help="Fraction (0..1) to verify randomly after copy (e.g., 0.02).") | |
| sp.add_argument("--max-rps", type=float, default=0.0, help="Max copy() operations per second (token bucket). 0 disables limiting.") | |
| sp.set_defaults(func=cmd_migrate) | |
| args = p.parse_args() | |
| args.func(args) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| sys.exit(130) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Multi-bucket S3 encryption planner | |
| - Lists region buckets (via s3r.list_buckets_in_region) | |
| - Builds/refreshes per-bucket *drift-aware* manifests using s3r.cmd_manifest (delegated) | |
| - Records explicit Target_*: | |
| * --kms-key-arn -> SSE-KMS with specific key (forced) | |
| * (omitted) -> force reset to SSE-S3 (AES256) by post-adjusting the manifest | |
| - Ctrl+C responsive loops | |
| - KMS preflight (s3r.kms_is_usable) to skip buckets before migrate | |
| - Invokes s3r.cmd_migrate per chosen bucket | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import sys | |
| import textwrap | |
| import signal | |
| from typing import Dict, List, Optional, Tuple | |
| from botocore.exceptions import ClientError | |
| try: | |
| import s3_reencrypt as s3r | |
| except Exception as e: | |
| print("ERROR: Could not import s3_reencrypt.py. Ensure it's alongside this file or on PYTHONPATH.") | |
| print(f"Import error: {e}") | |
| sys.exit(2) | |
| # --------------------------- | |
| # Ctrl+C handling | |
| # --------------------------- | |
| _STOP = False | |
| _SIGINT_COUNT = 0 | |
| def _on_sigint(sig, frame): | |
| global _STOP, _SIGINT_COUNT | |
| _SIGINT_COUNT += 1 | |
| if _SIGINT_COUNT == 1: | |
| _STOP = True | |
| print(f"[{s3r.utcnow()}] Stop requested (Ctrl+C). Finishing in-flight tasks and saving progress... (press Ctrl+C again to force abort)", flush=True) | |
| elif _SIGINT_COUNT == 2: | |
| print(f"[{s3r.utcnow()}] Force abort requested. Raising KeyboardInterrupt...", flush=True) | |
| raise KeyboardInterrupt | |
| else: | |
| print(f"[{s3r.utcnow()}] Hard abort. Exiting immediately.", flush=True) | |
| os._exit(130) | |
| signal.signal(signal.SIGINT, _on_sigint) | |
| signal.siginterrupt(signal.SIGINT, True) # let KeyboardInterrupt break blocking calls | |
| # --------------------------- | |
| # Manifest building (delegates to s3r.cmd_manifest) | |
| # --------------------------- | |
| def build_manifest_for_bucket( | |
| s3, | |
| bucket: str, | |
| manifest_path: str, | |
| target: Dict[str, str], | |
| prefix: str = "", | |
| workers: int = 32, # unused in this delegated implementation (kept for CLI compatibility) | |
| progress_every: int = 2000, # unused in this delegated implementation (kept for CLI compatibility) | |
| resume: bool = False, | |
| ) -> Tuple[int, int]: | |
| """ | |
| Build/refresh a manifest for `bucket` by delegating to s3r.cmd_manifest (which already supports resume/upsert). | |
| Behavior: | |
| - If target["algo"] == "aws:kms": pass --kms-key-arn to cmd_manifest so that Target_* is set to that key. | |
| - If target["algo"] == "AES256": call cmd_manifest WITHOUT --kms-key-arn, then FORCE Target_* to AES256 | |
| in a post-pass to ensure an explicit target snapshot regardless of the bucket default. | |
| Returns: (total_objects_in_manifest, drifted_count) | |
| """ | |
| # Ensure header exists for first run | |
| s3r.write_manifest_header(manifest_path) | |
| class _Args: | |
| pass | |
| args = _Args() | |
| args.bucket = bucket | |
| args.region = getattr(s3.meta, "region_name", None) or "us-east-1" | |
| args.prefix = prefix or "" | |
| args.out = manifest_path | |
| args.create_bucket = False | |
| args.resume = resume | |
| # When resuming, don't refresh | |
| args.refresh_existing = not resume | |
| # If desired target is KMS, pass the key id/arn through to cmd_manifest | |
| if target.get("algo") == "aws:kms": | |
| # cmd_manifest will normalize ARN or key-id via _kms_key_id | |
| args.kms_key_arn = target.get("kms_key_id") | |
| else: | |
| # We want to *force* AES256 as explicit Target_*, regardless of bucket default. | |
| # cmd_manifest has no flag for that, so we: | |
| # 1) run cmd_manifest without kms-key (uses bucket default or AES256 if none) | |
| # 2) post-adjust all rows' Target_* to AES256 and recompute DriftStatus from Pre_*. | |
| args.kms_key_arn = None | |
| # Run the delegated manifest command (handles true upsert, dedupe, atomic saves) | |
| try: | |
| s3r.cmd_manifest(args) | |
| except KeyboardInterrupt: | |
| print(f"[{s3r.utcnow()}] Stop observed during manifest build for {bucket}. Partial progress preserved.") | |
| raise # bubble up to stop the whole run cleanly | |
| # Post-adjust: if target is AES256, ensure explicit Target_* = AES256 and recompute DriftStatus | |
| if target.get("algo") == "AES256": | |
| try: | |
| rows = s3r.load_manifest(manifest_path) | |
| except Exception: | |
| rows = [] | |
| forced_target = {"algo": "AES256"} | |
| for r in rows: | |
| r["Target_Algo"] = "AES256" | |
| r["Target_KMSKeyId"] = "" | |
| # recompute drift from Pre_* snapshot against AES256 | |
| pre = { | |
| "ServerSideEncryption": r.get("Pre_ServerSideEncryption", ""), | |
| "SSEKMSKeyId": r.get("Pre_SSEKMSKeyId", ""), | |
| } | |
| r["DriftStatus"] = "DRIFTED" if s3r.needs_migration(pre, forced_target) else "IN_SYNC" | |
| r["UpdatedAt"] = s3r.utcnow() | |
| s3r.atomically_update_manifest(manifest_path, rows) | |
| # Summarize for return/logging | |
| rows = s3r.load_manifest(manifest_path) | |
| total = len(rows) | |
| drifted = sum(1 for r in rows if r.get("DriftStatus") == "DRIFTED") | |
| print(f"[{s3r.utcnow()}] {bucket}: Manifest written: {manifest_path} (total {total}, drifted {drifted})") | |
| return total, drifted | |
| # --------------------------- | |
| # Summaries & UI | |
| # --------------------------- | |
| def summarize_manifests(manifest_paths: Dict[str, str]) -> Dict[str, Dict[str, int]]: | |
| """ | |
| Returns per-bucket summary: | |
| { bucket: {"total": N, "drifted": M, "errors": E, "in_sync": K} } | |
| """ | |
| summary: Dict[str, Dict[str, int]] = {} | |
| for bucket, path in manifest_paths.items(): | |
| rows = s3r.load_manifest(path) | |
| total = len(rows) | |
| drifted = sum(1 for r in rows if r.get("DriftStatus") == "DRIFTED") | |
| errors = sum(1 for r in rows if r.get("Status") == "ERROR" or r.get("DriftStatus") == "UNKNOWN") | |
| in_sync = total - drifted | |
| summary[bucket] = {"total": total, "drifted": drifted, "errors": errors, "in_sync": in_sync} | |
| return summary | |
| def human_after(target: Dict[str, str]) -> str: | |
| return "SSE-S3 (AES256)" if target.get("algo") == "AES256" else f"SSE-KMS (key-id={target.get('kms_key_id')})" | |
| def print_plan_table(summary: Dict[str, Dict[str, int]], after_desc_by_bucket: Dict[str, str]): | |
| headers = ["#", "Bucket", "Total", "Drifted", "Errors/Unknown", "After"] | |
| rows: List[List[str]] = [] | |
| for i, (bucket, stats) in enumerate(sorted(summary.items(), key=lambda x: x[0]), start=1): | |
| rows.append([ | |
| str(i), | |
| bucket, | |
| str(stats["total"]), | |
| str(stats["drifted"]), | |
| str(stats["errors"]), | |
| after_desc_by_bucket.get(bucket, "?"), | |
| ]) | |
| colw = [max(len(h), *(len(r[idx]) for r in rows)) for idx, h in enumerate(headers)] | |
| fmt = " ".join("{:<" + str(w) + "}" for w in colw) | |
| print() | |
| print(fmt.format(*headers)) | |
| print(fmt.format(*["-" * w for w in colw])) | |
| for r in rows: | |
| print(fmt.format(*r)) | |
| print() | |
| def ask_selection(buckets: List[str]) -> List[str]: | |
| print("Select buckets to migrate:") | |
| for idx, b in enumerate(buckets, start=1): | |
| print(f" {idx}) {b}") | |
| print("Enter numbers (comma-separated), ranges (e.g., 1-3), or '*' for all. Example: 1,3,5-7") | |
| try: | |
| raw = input("> ").strip() | |
| except KeyboardInterrupt: | |
| print("\nAborted by user.") | |
| return [] | |
| if raw == "*": | |
| return buckets[:] | |
| chosen: List[str] = [] | |
| parts = [p.strip() for p in raw.split(",") if p.strip()] | |
| idx_map = {str(i): b for i, b in enumerate(buckets, start=1)} | |
| for p in parts: | |
| if "-" in p: | |
| a, b = p.split("-", 1) | |
| try: | |
| start = int(a); end = int(b) | |
| for i in range(start, end + 1): | |
| key = str(i) | |
| if key in idx_map and idx_map[key] not in chosen: | |
| chosen.append(idx_map[key]) | |
| except ValueError: | |
| pass | |
| else: | |
| if p in idx_map and idx_map[p] not in chosen: | |
| chosen.append(idx_map[p]) | |
| return chosen | |
| def confirm(prompt: str) -> bool: | |
| try: | |
| ans = input(f"{prompt} [y/N]: ").strip().lower() | |
| except KeyboardInterrupt: | |
| print("\nAborted by user.") | |
| return False | |
| return ans in ("y", "yes") | |
| # --------------------------- | |
| # Migration launcher | |
| # --------------------------- | |
| def run_migrations(region: Optional[str], | |
| manifest_paths: Dict[str, str], | |
| target_by_bucket: Dict[str, Dict[str, str]]): | |
| """ | |
| For each selected bucket's manifest, run migrate only if the KMS target is usable (when applicable). | |
| """ | |
| for bucket, path in manifest_paths.items(): | |
| target = target_by_bucket.get(bucket, {}) or {} | |
| if target.get("algo") == "aws:kms": | |
| key_id = target.get("kms_key_id") | |
| if not s3r.kms_is_usable(region or "us-east-1", key_id): | |
| print(f"[{s3r.utcnow()}] Skipping {bucket}: KMS key not usable in {region} (or insufficient permissions).") | |
| continue | |
| print(f"\n=== Migrating bucket: {bucket} ===") | |
| class _Args: # simple namespace mirroring s3r CLI | |
| pass | |
| args = _Args() | |
| args.region = region | |
| args.manifest = path | |
| args.kms_key_arn = None # manifest Target_* drives the per-row policy | |
| args.workers = 16 | |
| args.max_retries = 3 | |
| args.progress_every = 500 | |
| args.only_pending = False | |
| try: | |
| s3r.cmd_migrate(args) | |
| except ClientError as e: | |
| code = (e.response or {}).get("Error", {}).get("Code", "Unknown") | |
| if code.startswith("KMS.") or code in ("AccessDeniedException", "ValidationException"): | |
| print(f"[{s3r.utcnow()}] Skipping {bucket} due to KMS error: {code}") | |
| continue | |
| raise | |
| print(f"=== Completed: {bucket} ===\n") | |
| # --------------------------- | |
| # Main | |
| # --------------------------- | |
| def main(): | |
| p = argparse.ArgumentParser( | |
| description="Plan and migrate SSE across multiple S3 buckets using drift-aware manifests." | |
| ) | |
| p.add_argument("--region", required=True, help="Region to scope buckets to (e.g., us-east-1).") | |
| p.add_argument("--exclude-substr", action="append", default=[], | |
| help="Exclude buckets whose name contains this substring. Can be given multiple times.") | |
| p.add_argument("--kms-key-arn", default=None, | |
| help="If provided, plan/migrate to this KMS key (SSE-KMS). If omitted, plan to reset to SSE-S3 (AES256).") | |
| p.add_argument("--manifests-dir", default="data/manifests", | |
| help="Directory to store per-bucket manifest CSVs.") | |
| p.add_argument("--prefix", default="", help="Optional prefix filter for object listing.") | |
| p.add_argument("--ignore-default-sse-s3", action="store_true", | |
| help="Exclude buckets whose bucket default encryption is SSE-S3 (AES256).") | |
| p.add_argument("--resume", action="store_true", | |
| help="Upsert into existing per-bucket manifests; safe to cancel & rerun (final rows retained).") | |
| p.add_argument("--manifest-workers", type=int, default=32, | |
| help="(Unused in delegated mode) kept for CLI compatibility.") | |
| args = p.parse_args() | |
| # S3 client via companion (bigger conn pool helps for HEAD fanout if needed elsewhere) | |
| s3 = s3r.s3_client(args.region, max_pool_connections=256) | |
| # Bucket discovery + filtering (via companion helpers) | |
| all_buckets = s3r.list_buckets_in_region(s3, args.region) | |
| print(f"[{s3r.utcnow()}] Found {len(all_buckets)} buckets in region {args.region}") | |
| excludes = [x.lower() for x in args.exclude_substr] | |
| buckets: List[str] = [] | |
| for b in all_buckets: | |
| if _STOP: | |
| print(f"[{s3r.utcnow()}] Stop observed; exiting before scanning manifests.") | |
| return | |
| low = b.lower() | |
| if any(ex in low for ex in excludes): | |
| continue | |
| if not s3r.bucket_exists(s3, b): | |
| continue | |
| if args.ignore_default_sse_s3: | |
| is_aes256 = s3r.bucket_default_is_sse_s3(s3, b) | |
| if is_aes256 is True: | |
| print(f"[{s3r.utcnow()}] Skipping {b}: default encryption is already SSE-S3 (AES256).") | |
| continue | |
| buckets.append(b) | |
| if not buckets: | |
| print("No candidate buckets after filtering.") | |
| return | |
| # Choose explicit per-bucket target (force policy snapshot in manifest) | |
| if args.kms_key_arn: | |
| target_template = s3r.kms_target(args.kms_key_arn) | |
| else: | |
| target_template = s3r.aes256_target() | |
| manifests: Dict[str, str] = {} | |
| after_desc_by_bucket: Dict[str, str] = {} | |
| target_by_bucket: Dict[str, Dict[str, str]] = {} | |
| for b in buckets: | |
| if _STOP: | |
| print(f"[{s3r.utcnow()}] Stop observed; exiting before processing bucket {b}.") | |
| return | |
| target = dict(target_template) # same policy for all; copy for clarity | |
| target_by_bucket[b] = target | |
| after_desc_by_bucket[b] = "SSE-S3 (AES256)" if target["algo"] == "AES256" else f"SSE-KMS (key-id={target['kms_key_id']})" | |
| manifest_path = os.path.join(args.manifests_dir, f"{b}.csv") | |
| if not args.resume and os.path.exists(manifest_path): | |
| os.remove(manifest_path) | |
| total, drifted = build_manifest_for_bucket( | |
| s3, b, manifest_path, target, | |
| prefix=args.prefix, workers=args.manifest_workers, progress_every=2000, resume=args.resume | |
| ) | |
| manifests[b] = manifest_path | |
| # Summarize & present plan | |
| summary = summarize_manifests(manifests) | |
| print_plan_table(summary, after_desc_by_bucket) | |
| drift_buckets = [b for b, s in summary.items() if s["drifted"] > 0] | |
| if not drift_buckets: | |
| print("No drift detected. Nothing to migrate.") | |
| return | |
| print(textwrap.dedent(""" | |
| The buckets above show how many objects differ from the target: | |
| Target after migration: (varies per bucket; see 'After' column) | |
| Tip: You usually only want to migrate buckets with DRIFTED > 0. | |
| """).strip()) | |
| chosen = ask_selection([b for b in buckets if summary[b]["drifted"] > 0]) | |
| if not chosen: | |
| print("No buckets selected. Exiting.") | |
| return | |
| print("\nYou chose to migrate these buckets:") | |
| for b in chosen: | |
| stats = summary[b] | |
| print(f" - {b}: {stats['drifted']} / {stats['total']} objects will be updated → {after_desc_by_bucket[b]}") | |
| if not confirm("Proceed with migration?"): | |
| print("Aborted by user.") | |
| return | |
| # KMS preflight + run migrations | |
| run_migrations(args.region, {b: manifests[b] for b in chosen}, target_by_bucket) | |
| print("\nAll selected migrations completed.") | |
| print("Reminder: If you were moving to SSE-KMS, consider setting the bucket default encryption accordingly:") | |
| print(f" python s3_reencrypt.py ensure-bucket-encryption --bucket <bucket> --region {args.region} --kms-key-arn <key-arn>") | |
| print("Or to reset to S3-managed (AES256):") | |
| print(f" python s3_reencrypt.py ensure-bucket-encryption --bucket <bucket> --region {args.region}") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| # exit code 130 is conventional for SIGINT | |
| sys.exit(130) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.