Skip to content

Instantly share code, notes, and snippets.

@filipeandre
Last active October 16, 2025 14:19
Show Gist options
  • Save filipeandre/8c08e4bc6faff62cedf1a0217705a584 to your computer and use it in GitHub Desktop.
Save filipeandre/8c08e4bc6faff62cedf1a0217705a584 to your computer and use it in GitHub Desktop.
S3 encryption migration tool (SSE-S3 -> SSE-KMS or ensure SSE-KMS)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
S3 encryption migration tool (SSE-S3 -> SSE-KMS or ensure SSE-KMS).
- Generates manifest of current objects with SSE/KMS info + drift status
- Ensures bucket default encryption (optional)
- Re-encrypts objects in place to a target KMS key (idempotent, resumable)
Usage:
# Build manifest only (drift vs bucket default or explicit KMS key)
python s3_reencrypt.py manifest --bucket playground-bucket --region us-east-1 --out data/s3_manifest.csv
python s3_reencrypt.py manifest --bucket playground-bucket --region us-east-1 --out data/s3_manifest.csv \
--kms-key-arn arn:aws:kms:us-east-1:123456789012:key/abcd-...-efgh --head-workers 48 --head-max-rps 250
# Ensure default encryption = SSE-KMS with a specific key (or AES256 if omitted)
python s3_reencrypt.py ensure-bucket-encryption --bucket playground-bucket --region us-east-1 \
--kms-key-arn arn:aws:kms:us-east-1:123456789012:key/abcd-...-efgh
# Migrate encryption in-place using the manifest (resumable; uses manifest Target_* and DriftStatus)
python s3_reencrypt.py migrate --manifest data/s3_manifest.csv --region us-east-1 --workers 32 --max-retries 7 \
--max-rps 150 --verify-sample 0.02
"""
import time
import re
import argparse
import csv
import os
import sys
import signal
import random
import threading
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, Optional, Tuple, List
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
# ---------------------------
# Utilities
# ---------------------------
ISO8601 = "%Y-%m-%dT%H:%M:%S%z"
def utcnow() -> str:
return datetime.now(timezone.utc).strftime(ISO8601)
def ensure_dir(path: str):
d = os.path.dirname(os.path.abspath(path))
if d:
os.makedirs(d, exist_ok=True)
def safe_int(v, default=0):
try:
return int(v)
except Exception:
return default
def log(msg: str):
print(f"[{utcnow()}] {msg}", flush=True)
def backoff_sleep(attempt: int) -> float:
# Exponential backoff w/ full jitter
base = min(2 ** attempt, 64) # cap growth
return random.uniform(0.5, base)
# ---------------------------
# Simple rate limiter (token bucket)
# ---------------------------
class RateLimiter:
def __init__(self, rate_per_sec: float):
self.rate = max(0.0, float(rate_per_sec or 0.0))
self.tokens = self.rate
self.last = time.monotonic()
self.lock = threading.Lock()
def acquire(self):
if self.rate <= 0.0:
return
with self.lock:
now = time.monotonic()
elapsed = now - self.last
self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
self.last = now
if self.tokens < 1.0:
need = 1.0 - self.tokens
time.sleep(need / self.rate)
self.tokens = 0.0
else:
self.tokens -= 1.0
# ---------------------------
# S3 helpers
# ---------------------------
def s3_client(region: Optional[str], max_pool_connections: int = 200) -> Any:
session = boto3.session.Session(region_name=region)
cfg = Config(
retries={"mode": "adaptive", "max_attempts": 10},
connect_timeout=10,
read_timeout=120,
max_pool_connections=max_pool_connections,
tcp_keepalive=True,
)
return session.client("s3", config=cfg)
def bucket_exists(s3, bucket: str) -> bool:
try:
s3.head_bucket(Bucket=bucket)
return True
except ClientError as e:
code = int(e.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0))
if code == 403:
# Exists but no head access
return True
return False
def create_bucket_if_needed(s3, bucket: str, region: Optional[str]):
if bucket_exists(s3, bucket):
log(f"Bucket exists: s3://{bucket}")
return
if region in (None, "", "us-east-1"):
s3.create_bucket(Bucket=bucket)
else:
s3.create_bucket(
Bucket=bucket,
CreateBucketConfiguration={"LocationConstraint": region},
)
log(f"Created bucket: s3://{bucket} (region={region or 'us-east-1'})")
def set_default_encryption_sse_s3(s3, bucket: str):
s3.put_bucket_encryption(
Bucket=bucket,
ServerSideEncryptionConfiguration={
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "AES256"
},
"BucketKeyEnabled": False,
}
]
},
)
def set_default_encryption_kms(s3, bucket: str, kms_arn: str, bucket_key_enabled: bool = True):
s3.put_bucket_encryption(
Bucket=bucket,
ServerSideEncryptionConfiguration={
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": kms_arn,
},
"BucketKeyEnabled": bucket_key_enabled,
}
]
},
)
def get_bucket_default_encryption(s3, bucket: str) -> Dict:
try:
resp = s3.get_bucket_encryption(Bucket=bucket)
cfg = resp.get("ServerSideEncryptionConfiguration", {})
rules = cfg.get("Rules", [])
if rules:
return rules[0].get("ApplyServerSideEncryptionByDefault", {})
return {}
except ClientError as e:
if e.response.get("Error", {}).get("Code") == "ServerSideEncryptionConfigurationNotFoundError":
return {}
raise
def list_objects_stream(s3, bucket: str, prefix: str = "") -> Iterable[Dict]:
paginator = s3.get_paginator("list_objects_v2")
kwargs = {"Bucket": bucket}
if prefix:
kwargs["Prefix"] = prefix
# Use larger page size for efficiency on big buckets
for page in paginator.paginate(**kwargs, PaginationConfig={"PageSize": 1000}):
for obj in page.get("Contents", []) or []:
yield obj
def head_object_encryption(s3, bucket: str, key: str) -> Dict:
"""Single-attempt HEAD wrapper (kept for compatibility). Prefer head_object_encryption_retry in hot paths."""
try:
resp = s3.head_object(Bucket=bucket, Key=key)
return {
"ServerSideEncryption": resp.get("ServerSideEncryption", ""), # 'AES256' or 'aws:kms'
"SSEKMSKeyId": resp.get("SSEKMSKeyId", ""),
"BucketKeyEnabled": resp.get("BucketKeyEnabled", ""),
"ETag": resp.get("ETag"),
"ContentLength": resp.get("ContentLength"),
"LastModified": resp.get("LastModified").strftime(ISO8601) if resp.get("LastModified") else "",
}
except ClientError as e:
return {
"ServerSideEncryption": "",
"SSEKMSKeyId": "",
"BucketKeyEnabled": "",
"ETag": "",
"ContentLength": "",
"LastModified": "",
"Error": f"{e.response.get('Error', {}).get('Code')}: {e.response.get('Error', {}).get('Message')}",
}
# ---- Retriable HEAD (Tier-1 improvement #1)
RETRIABLE_S3_ERRORS = {
"SlowDown","Throttling","ThrottlingException","RequestTimeout",
"InternalError","ServiceUnavailable","ExpiredToken","ExpiredTokenException",
}
def head_object_encryption_retry(s3, bucket: str, key: str, max_retries: int = 5) -> Dict:
"""HEAD with jittered retries on transient errors; returns same dict shape as head_object_encryption."""
attempt = 0
while True:
try:
resp = s3.head_object(Bucket=bucket, Key=key)
return {
"ServerSideEncryption": resp.get("ServerSideEncryption", ""),
"SSEKMSKeyId": resp.get("SSEKMSKeyId", ""),
"BucketKeyEnabled": resp.get("BucketKeyEnabled", ""),
"ETag": resp.get("ETag"),
"ContentLength": resp.get("ContentLength"),
"LastModified": resp.get("LastModified").strftime(ISO8601) if resp.get("LastModified") else "",
}
except ClientError as e:
code = e.response.get("Error", {}).get("Code", "")
attempt += 1
if code not in RETRIABLE_S3_ERRORS or attempt >= max_retries:
return {
"ServerSideEncryption": "",
"SSEKMSKeyId": "",
"BucketKeyEnabled": "",
"ETag": "",
"ContentLength": "",
"LastModified": "",
"Error": f"{code}: {e.response.get('Error', {}).get('Message')}",
}
time.sleep(backoff_sleep(attempt))
except Exception as e:
# Non-ClientError, return error payload
return {
"ServerSideEncryption": "",
"SSEKMSKeyId": "",
"BucketKeyEnabled": "",
"ETag": "",
"ContentLength": "",
"LastModified": "",
"Error": f"Exception: {e}",
}
def _kms_key_id(s: str) -> str:
"""Normalize KMS identifier to key-id so ARN vs key-id compares equal."""
if not s:
return ""
m = re.search(r"(?:^|/)key/([0-9a-fA-F-]{8,})$", s)
return m.group(1) if m else s
def read_bucket_target_policy(s3, bucket: str) -> Dict[str, str]:
"""
Return desired per-object encryption policy derived from bucket default:
{"algo": "AES256"} or {"algo":"aws:kms","kms_key_id":"<key-id>"}
If no bucket default is set, fall back to AES256.
"""
cfg = get_bucket_default_encryption(s3, bucket) or {}
algo = cfg.get("SSEAlgorithm", "")
if algo == "aws:kms":
return {"algo": "aws:kms", "kms_key_id": _kms_key_id(cfg.get("KMSMasterKeyID", ""))}
# treat missing/unknown as AES256 target
return {"algo": "AES256"}
def _target_from_args_or_bucket(s3, bucket: str, kms_key_arn: Optional[str]) -> Dict[str, str]:
"""Pick a stable target policy snapshot for the manifest."""
if kms_key_arn:
return {"algo": "aws:kms", "kms_key_id": _kms_key_id(kms_key_arn)}
return read_bucket_target_policy(s3, bucket)
def needs_migration(row: Dict, target_policy: Dict[str, str]) -> bool:
algo = target_policy.get("algo")
if algo == "AES256":
# object must be AES256 (explicit); anything else gets migrated
return row.get("ServerSideEncryption") != "AES256"
if algo == "aws:kms":
return not (
row.get("ServerSideEncryption") == "aws:kms"
and _kms_key_id(row.get("SSEKMSKeyId", "")) == target_policy.get("kms_key_id", "")
)
# default: migrate
return True
def copy_in_place_to_policy(
s3,
bucket: str,
key: str,
target_policy: Dict[str, str],
max_retries: int = 3,
limiter: Optional[RateLimiter] = None,
) -> None:
"""Re-encrypts an existing S3 object in place according to target_policy.
If the file is archived or otherwise invalid, abort without retrying.
"""
attempt = 0
while True:
try:
if limiter is not None:
limiter.acquire()
kwargs = {
"Bucket": bucket,
"Key": key,
"CopySource": {"Bucket": bucket, "Key": key},
"MetadataDirective": "COPY",
"TaggingDirective": "COPY",
}
if target_policy.get("algo") == "aws:kms":
kwargs.update({
"ServerSideEncryption": "aws:kms",
"SSEKMSKeyId": target_policy["kms_key_id"],
"BucketKeyEnabled": True, # enable S3 Bucket Keys on the copy for cost savings
})
else:
kwargs.update({"ServerSideEncryption": "AES256"})
s3.copy_object(**kwargs)
return
except ClientError as e:
code = e.response.get("Error", {}).get("Code", "")
attempt += 1
# Skip immediately if key is invalid or inaccessible for KMS
if code == "KMS.ValidationException":
log(f"KMS key validation failed for s3://{bucket}/{key} — skipping object.")
e.operation = "skip_object"
raise e
# Abort immediately if object is archived or not retrievable
if code in ("InvalidObjectState", "ObjectNotInActiveTierError"):
log(f"File s3://{bucket}/{key} is archived or not accessible ({code}), skipping.")
e.operation = "invalid_file_status"
raise e
if attempt >= max_retries:
raise
sleep_s = backoff_sleep(attempt)
log(
f"Retry {attempt}/{max_retries} for s3://{bucket}/{key}: "
f"{code or 'UnknownError'} — sleeping {sleep_s:.1f}s"
)
time.sleep(sleep_s)
# ---- Planner helpers ----
def aes256_target() -> Dict[str, str]:
"""Return a manifest target that forces S3-managed encryption (AES256)."""
return {"algo": "AES256"}
def kms_target(kms_key_arn_or_id: str) -> Dict[str, str]:
"""Return a manifest target for SSE-KMS with normalized key id."""
return {"algo": "aws:kms", "kms_key_id": _kms_key_id(kms_key_arn_or_id)}
def needs_migration_from_head(head: Dict, target_policy: Dict[str, str]) -> bool:
"""Adapter: compute drift using a head_object dict."""
return needs_migration(
{"ServerSideEncryption": head.get("ServerSideEncryption", ""),
"SSEKMSKeyId": head.get("SSEKMSKeyId", "")},
target_policy
)
def build_manifest_row(bucket: str, obj: Dict, head: Dict, target: Dict[str, str],
preserve_from: Optional[Dict] = None) -> Dict:
"""
Build a single manifest row using head() + target.
If preserve_from has final states (MIGRATED/SKIPPED), keep them.
"""
if head.get("Error"):
drift = "UNKNOWN"
else:
drift = "DRIFTED" if needs_migration_from_head(head, target) else "IN_SYNC"
row = {
"Bucket": bucket,
"Key": obj["Key"],
"Size": obj.get("Size"),
"ETag": obj.get("ETag", ""),
"LastModified": obj.get("LastModified").strftime(ISO8601) if obj.get("LastModified") else "",
"StorageClass": obj.get("StorageClass", ""),
"Pre_ServerSideEncryption": head.get("ServerSideEncryption", ""),
"Pre_SSEKMSKeyId": head.get("SSEKMSKeyId", ""),
"Pre_BucketKeyEnabled": head.get("BucketKeyEnabled", ""),
"Target_Algo": target.get("algo", ""),
"Target_KMSKeyId": target.get("kms_key_id", ""),
"DriftStatus": drift,
"Status": "PENDING",
"Attempts": 0,
"LastError": "",
"UpdatedAt": utcnow(),
}
if preserve_from and preserve_from.get("Status") in ("MIGRATED", "SKIPPED"):
row["Status"] = preserve_from["Status"]
row["Attempts"] = safe_int(preserve_from.get("Attempts", 0))
row["LastError"] = preserve_from.get("LastError", "")
return row
def list_buckets_in_region(s3, region: str) -> List[str]:
"""List bucket names in the given region (us-east-1 represented as None by S3)."""
resp = s3.list_buckets()
names = [b["Name"] for b in resp.get("Buckets", [])]
out: List[str] = []
for b in names:
try:
loc = s3.get_bucket_location(Bucket=b).get("LocationConstraint")
if (loc or "us-east-1") == region:
out.append(b)
except Exception:
continue
return out
def bucket_default_is_sse_s3(s3, bucket: str) -> Optional[bool]:
"""
True -> default is explicitly SSE-S3 (AES256)
False -> default is something else (none/KMS/unknown)
None -> couldn't determine (permissions/other error)
"""
try:
resp = s3.get_bucket_encryption(Bucket=bucket)
cfg = resp.get("ServerSideEncryptionConfiguration", {}) or {}
for rule in cfg.get("Rules", []):
by_default = rule.get("ApplyServerSideEncryptionByDefault", {}) or {}
algo = by_default.get("SSEAlgorithm")
kms = by_default.get("KMSMasterKeyID") or by_default.get("KMSMasterKeyId")
return (algo == "AES256") and (not kms)
return False
except ClientError as e:
code = (e.response or {}).get("Error", {}).get("Code", "Unknown")
if code in ("ServerSideEncryptionConfigurationNotFoundError", "NoSuchEncryptionConfiguration"):
return False
return None
except Exception:
return None
def kms_is_usable(region: str, kms_key_arn_or_id: str) -> bool:
"""
Preflight: key exists, is enabled, and caller can Encrypt with it.
Returns False on any KMS/permission error.
"""
if not kms_key_arn_or_id:
return False
kms = boto3.client("kms", region_name=region)
try:
desc = kms.describe_key(KeyId=kms_key_arn_or_id)
meta = desc.get("KeyMetadata", {}) or {}
if meta.get("KeyState") != "Enabled":
return False
kms.encrypt(KeyId=kms_key_arn_or_id, Plaintext=b"\x00", EncryptionAlgorithm="SYMMETRIC_DEFAULT")
return True
except ClientError:
return False
except Exception:
return False
# ---- End helpers ----
# ---------------------------
# Manifest management
# ---------------------------
MANIFEST_FIELDS = [
"Bucket", "Key", "Size", "ETag", "LastModified", "StorageClass",
"Pre_ServerSideEncryption", "Pre_SSEKMSKeyId", "Pre_BucketKeyEnabled",
"Target_Algo", "Target_KMSKeyId", "DriftStatus",
"Status", "Attempts", "LastError", "UpdatedAt"
]
def write_manifest_header(path: str):
ensure_dir(path)
if not os.path.exists(path):
with open(path, "w", newline="", encoding="utf-8") as f:
csv.DictWriter(f, fieldnames=MANIFEST_FIELDS).writeheader()
def append_manifest_rows(path: str, rows: List[Dict]):
ensure_dir(path)
with open(path, "a", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=MANIFEST_FIELDS)
for r in rows:
w.writerow(r)
def load_manifest(path: str) -> List[Dict]:
with open(path, newline="", encoding="utf-8") as f:
rdr = csv.DictReader(f)
return list(rdr)
def atomically_update_manifest(path: str, rows: List[Dict]):
tmp = f"{path}.tmp"
with open(tmp, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=MANIFEST_FIELDS)
w.writeheader()
for r in rows:
w.writerow(r)
os.replace(tmp, path)
def iter_manifest_pending(path: str) -> Iterable[Dict]:
with open(path, newline="", encoding="utf-8") as f:
rdr = csv.DictReader(f)
for row in rdr:
if row.get("Status") in ("", "PENDING", "ERROR"):
yield row
# ---------------------------
# Commands
# ---------------------------
def cmd_manifest(args):
"""
Build/refresh a drift-aware manifest for a bucket.
Improvements:
- parallel HEADs (non-resume and resume/upsert) with optional RPS limiter
- retrying HEADs on transient errors
- resume/upsert (--resume) to avoid duplicates and let you pick up after cancel
- optional refresh of existing rows' Pre_* and DriftStatus (--refresh-existing)
- per-page checkpoint (continuation token) for resumable listings
- unified heartbeat logging
- graceful Ctrl+C with checkpoint + early stop
"""
import json
# ---- helpers ----
def _load_manifest_index(path: str) -> Dict[str, Dict]:
if not os.path.exists(path):
return {}
rows = load_manifest(path)
return {r["Key"]: r for r in rows if r.get("Key")}
def _save_ckpt(ckpt_path: str, page_no: int, token: Optional[str]):
with open(ckpt_path, "w") as f:
json.dump({"page_no": page_no, "next_token": token}, f)
# ---- setup ----
resume = getattr(args, "resume", False)
refresh_existing = getattr(args, "refresh_existing", False)
log_every = safe_int(getattr(args, "log_every", 500), 500)
progress_every = safe_int(getattr(args, "progress_every_rows", 20000), 20000)
list_max_seconds = safe_int(getattr(args, "list_max_seconds", 0), 0)
# Graceful Ctrl+C for manifest (uses _handle_sigint defined below)
signal.signal(signal.SIGINT, _handle_sigint)
# pool sizing for S3 HTTP
desired_pool = 200
if getattr(args, "head_workers", 0):
desired_pool = max(200, int(args.head_workers) * 4)
s3 = s3_client(args.region, max_pool_connections=desired_pool)
if not bucket_exists(s3, args.bucket):
if getattr(args, "create_bucket", False):
create_bucket_if_needed(s3, args.bucket, args.region)
set_default_encryption_sse_s3(s3, args.bucket)
log("Enabled default bucket encryption: SSE-S3 (AES256)")
else:
log(f"Bucket does not exist: s3://{args.bucket}")
sys.exit(2)
write_manifest_header(args.out)
target = _target_from_args_or_bucket(s3, args.bucket, getattr(args, "kms_key_arn", None))
log(
f"Manifest target policy: {target['algo']}"
+ (f" key={target.get('kms_key_id')}" if target.get('kms_key_id') else "")
)
# ---- non-resume (fresh) ----
if not resume:
objs = list(list_objects_stream(s3, args.bucket, getattr(args, "prefix", "") or ""))
pool_size = int(getattr(args, "head_workers", 0)) or min(64, max(8, int(desired_pool * 0.8)))
head_max_rps = float(getattr(args, "head_max_rps", 0.0) or 0.0)
limiter = RateLimiter(head_max_rps) if head_max_rps > 0.0 else None
log(
f"Listing yielded {len(objs)} objects; HEAD concurrency={pool_size}"
+ (f"; head-max-rps={int(head_max_rps)}" if head_max_rps > 0 else "")
)
objs_by_key = {o["Key"]: o for o in objs}
def _head(k: str) -> Tuple[str, Dict]:
if limiter:
limiter.acquire()
return k, head_object_encryption_retry(s3, args.bucket, k)
count, batch = 0, []
with ThreadPoolExecutor(max_workers=pool_size) as ex:
futures = [ex.submit(_head, k) for k in objs_by_key.keys()]
for fut in as_completed(futures):
k, head = fut.result()
obj = objs_by_key[k]
row = build_manifest_row(args.bucket, obj, head, target)
batch.append(row)
count += 1
if len(batch) >= progress_every:
append_manifest_rows(args.out, batch)
batch.clear()
log(f"Manifested {count} objects...")
if batch:
append_manifest_rows(args.out, batch)
log(f"Manifest written: {args.out} (total {count} keys)")
return
# ---- resume/upsert (parallel per page) ----
index = _load_manifest_index(args.out)
processed, added, refreshed, dirty_since_save = 0, 0, 0, 0
start_ts = time.time()
ckpt_path = f"{args.out}.ckpt"
ckpt = None
if os.path.exists(ckpt_path):
try:
with open(ckpt_path) as f:
ckpt = json.load(f)
log(f"Resuming listing from checkpoint: page={ckpt.get('page_no')} next={ckpt.get('next_token')}")
except Exception:
ckpt = None
paginator = s3.get_paginator("list_objects_v2")
kwargs = {"Bucket": args.bucket}
prefix = getattr(args, "prefix", "") or ""
if prefix:
kwargs["Prefix"] = prefix
if ckpt and ckpt.get("next_token"):
kwargs["ContinuationToken"] = ckpt["next_token"]
# Concurrency knobs for HEADs
pool_size = int(getattr(args, "head_workers", 0)) or 64
head_max_rps = float(getattr(args, "head_max_rps", 0.0) or 0.0)
limiter = RateLimiter(head_max_rps) if head_max_rps > 0.0 else None
def _head(k: str) -> Tuple[str, Dict]:
if limiter:
limiter.acquire()
return k, head_object_encryption_retry(s3, args.bucket, k)
page_no = ckpt.get("page_no", 0) if ckpt else 0
for page in paginator.paginate(**kwargs, PaginationConfig={"PageSize": 1000}):
page_no += 1
objs = page.get("Contents", []) or []
dur = time.time() - start_ts
log(f"Listing page {page_no} (n={len(objs)}) in {dur:.2f}s")
start_ts = time.time()
if not objs:
_save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken"))
if not page.get("IsTruncated"):
break
continue
objs_by_key = {o["Key"]: o for o in objs}
page_added = 0
page_refreshed = 0
# Early stop before scheduling any work
if _stop_requested:
log("Stop flag observed before scheduling page work; saving checkpoint and exiting.")
_save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken"))
break
with ThreadPoolExecutor(max_workers=pool_size) as ex:
futures = [ex.submit(_head, k) for k in objs_by_key.keys()]
try:
for fut in as_completed(futures):
if _stop_requested:
for f in futures:
f.cancel()
# Python 3.9+: fast cancel of queued tasks
ex.shutdown(wait=False, cancel_futures=True)
atomically_update_manifest(args.out, list(index.values()))
log("Progress saved after stop. Exiting page early.")
break
k, head = fut.result()
obj = objs_by_key[k]
prev = index.get(k)
if prev and not refresh_existing:
processed += 1
if processed % log_every == 0:
log(f"Upsert progress: processed={processed}, total_rows={len(index)}")
continue
drift = "UNKNOWN" if head.get("Error") else (
"DRIFTED" if needs_migration_from_head(head, target) else "IN_SYNC"
)
row = prev.copy() if prev else {
"Bucket": args.bucket,
"Key": k,
"Status": "PENDING",
"Attempts": 0,
"LastError": ""
}
row.update({
"Size": obj.get("Size"),
"ETag": obj.get("ETag", ""),
"LastModified": obj.get("LastModified").strftime(ISO8601) if obj.get("LastModified") else "",
"StorageClass": obj.get("StorageClass", ""),
"Pre_ServerSideEncryption": head.get("ServerSideEncryption", ""),
"Pre_SSEKMSKeyId": head.get("SSEKMSKeyId", ""),
"Pre_BucketKeyEnabled": head.get("BucketKeyEnabled", ""),
"Target_Algo": target.get("algo", ""),
"Target_KMSKeyId": target.get("kms_key_id", ""),
"DriftStatus": drift,
"UpdatedAt": utcnow(),
})
index[k] = row
processed += 1
if prev:
page_refreshed += 1
else:
page_added += 1
dirty_since_save += 1
if processed % log_every == 0:
log(
f"Upsert progress: processed={processed}, total_rows={len(index)} "
f"(added={added + page_added}, refreshed={refreshed + page_refreshed})"
)
if dirty_since_save >= progress_every:
atomically_update_manifest(args.out, list(index.values()))
log(
f"Checkpoint save: processed={processed}, added={added + page_added}, "
f"refreshed={refreshed + page_refreshed}, total_rows={len(index)}"
)
dirty_since_save = 0
# Optional per-page time budget
if list_max_seconds and (time.time() - start_ts) > list_max_seconds:
log(f"Reached time budget (--list-max-seconds={list_max_seconds}). Stopping early.")
for f in futures:
f.cancel()
ex.shutdown(wait=False, cancel_futures=True)
break
finally:
if _stop_requested:
_save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken"))
break
added += page_added
refreshed += page_refreshed
# Save page checkpoint
_save_ckpt(ckpt_path, page_no, page.get("NextContinuationToken"))
# Stop before fetching next page if requested
if _stop_requested:
log("Stop flag observed after page completion; exiting before next page.")
break
if not page.get("IsTruncated"):
break
# Final write
atomically_update_manifest(args.out, list(index.values()))
log(
f"Upsert complete: processed={processed}, added={added}, refreshed={refreshed}, "
f"total_rows={len(index)} (file: {args.out})"
)
def cmd_ensure_bucket_encryption(args):
s3 = s3_client(args.region)
if not bucket_exists(s3, args.bucket):
log(f"Bucket does not exist: s3://{args.bucket}")
sys.exit(2)
current = get_bucket_default_encryption(s3, args.bucket)
algo = current.get("SSEAlgorithm", "None")
kms = _kms_key_id(current.get("KMSMasterKeyID", "")) # normalize
if args.kms_key_arn:
if algo == "aws:kms" and kms == _kms_key_id(args.kms_key_arn):
log(f"Bucket already uses SSE-KMS with provided key:\n {args.kms_key_arn}")
else:
set_default_encryption_kms(s3, args.bucket, args.kms_key_arn, bucket_key_enabled=not args.no_bucket_key)
log(
f"Updated bucket default encryption to SSE-KMS:\n {args.kms_key_arn} "
f"(BucketKeyEnabled={not args.no_bucket_key})"
)
else:
if algo == "AES256":
log("Bucket already uses SSE-S3 (AES256)")
else:
set_default_encryption_sse_s3(s3, args.bucket)
log("Updated bucket default encryption to SSE-S3 (AES256)")
def _migrate_one(s3, row: Dict, target_policy: Dict[str, str], max_retries: int,
verify_mode: Dict[str, float], limiter: Optional[RateLimiter]) -> Tuple[str, Dict]:
bucket = row["Bucket"]; key = row["Key"]
def _ok(v: Dict) -> bool:
if target_policy.get("algo") == "AES256":
return v.get("ServerSideEncryption") == "AES256"
return (
v.get("ServerSideEncryption") == "aws:kms"
and _kms_key_id(v.get("SSEKMSKeyId", "")) == target_policy.get("kms_key_id", "")
)
# Re-head (object might have changed since manifest) — with retries
head = head_object_encryption_retry(s3, bucket, key)
if not needs_migration(
{"ServerSideEncryption": head.get("ServerSideEncryption", ""),
"SSEKMSKeyId": head.get("SSEKMSKeyId", "")},
target_policy
):
row["Status"] = "SKIPPED"
row["LastError"] = ""
row["UpdatedAt"] = utcnow()
row["DriftStatus"] = "IN_SYNC"
return "SKIPPED", row
# Copy + (optional) verify; refresh client once on ExpiredToken
try:
copy_in_place_to_policy(s3, bucket, key, target_policy, max_retries=max_retries, limiter=limiter)
do_verify = False
if verify_mode.get("mode") == "all":
do_verify = True
elif verify_mode.get("mode") == "sample":
do_verify = (random.random() < float(verify_mode.get("sample", 0.0)))
if do_verify:
v = head_object_encryption_retry(s3, bucket, key)
row["Status"] = "MIGRATED" if _ok(v) else "ERROR"
row["LastError"] = "" if row["Status"] == "MIGRATED" else "Post-copy verification failed"
row["DriftStatus"] = "IN_SYNC" if row["Status"] == "MIGRATED" else "DRIFTED"
else:
row["Status"] = "MIGRATED"
row["DriftStatus"] = "IN_SYNC"
row["LastError"] = ""
except ClientError as e:
code = e.response.get("Error", {}).get("Code")
if getattr(e, "operation", "") == "skip_object" or code == "KMS.ValidationException":
row["Status"] = "SKIPPED"
row["LastError"] = "KMS.ValidationException — skipping object."
row["DriftStatus"] = "DRIFTED"
return "SKIPPED", row
if code in ("ExpiredToken", "ExpiredTokenException"):
try:
fresh = s3_client(getattr(s3.meta, "region_name", None), max_pool_connections=200)
copy_in_place_to_policy(fresh, bucket, key, target_policy, max_retries=max_retries, limiter=limiter)
if verify_mode.get("mode") == "all" or (
verify_mode.get("mode") == "sample" and random.random() < float(verify_mode.get("sample", 0.0))
):
v = head_object_encryption_retry(fresh, bucket, key)
row["Status"] = "MIGRATED" if _ok(v) else "ERROR"
row["LastError"] = "" if row["Status"] == "MIGRATED" else "Post-copy verification failed (after refresh)"
row["DriftStatus"] = "IN_SYNC" if row["Status"] == "MIGRATED" else "DRIFTED"
else:
row["Status"] = "MIGRATED"
row["DriftStatus"] = "IN_SYNC"
row["LastError"] = ""
except Exception as e2:
row["Status"] = "ERROR"; row["LastError"] = str(e2)
row["DriftStatus"] = "DRIFTED"
else:
row["Status"] = "ERROR"; row["LastError"] = str(e)
row["DriftStatus"] = "DRIFTED"
except Exception as e:
row["Status"] = "ERROR"; row["LastError"] = str(e)
row["DriftStatus"] = "DRIFTED"
row["Attempts"] = safe_int(row.get("Attempts", 0)) + 1
row["UpdatedAt"] = utcnow()
return row["Status"], row
# Escalating SIGINT (triple Ctrl+C)
_stop_requested = False
_sigint_count = 0
def _handle_sigint(sig, frame):
global _stop_requested, _sigint_count
_sigint_count += 1
if _sigint_count == 1:
_stop_requested = True
log("Stop requested (Ctrl+C). Finishing in-flight tasks before writing progress... (press Ctrl+C again to force abort)")
elif _sigint_count == 2:
log("Force abort requested. Raising KeyboardInterrupt...")
raise KeyboardInterrupt
else:
log("Hard abort. Exiting immediately.")
os._exit(130)
def cmd_migrate(args):
signal.signal(signal.SIGINT, _handle_sigint)
# Scale S3 HTTP pool with workers (Tier-1 improvement #3)
s3 = s3_client(args.region, max_pool_connections=max(args.workers * 4, 200))
rows = load_manifest(args.manifest)
if not rows:
log("Manifest is empty; nothing to do.")
return
# Determine if manifest has target/drift columns
manifest_has_target = "Target_Algo" in rows[0]
manifest_has_drift = "DriftStatus" in rows[0]
total = len(rows)
log(f"Loaded manifest with {total} rows")
# Worklist selection
if args.only_pending:
# truly ignore DriftStatus when requested
work = [r for r in rows if r.get("Status") in ("", "PENDING")]
else:
if manifest_has_drift:
# Process explicit drift + previous errors by default
work = [r for r in rows if (r.get("DriftStatus") == "DRIFTED") or (r.get("Status") == "ERROR")]
else:
work = [r for r in rows if r.get("Status") in ("", "PENDING", "ERROR", "WOULD_MIGRATE")]
if not work:
log("No rows to process.")
return
log(f"Processing {len(work)} rows (workers={args.workers})")
# In-memory index for quick updates
by_key = {(r["Bucket"], r["Key"]): r for r in rows}
completed = 0
migrated = 0
skipped = 0
errored = 0
def _save_progress(prefix: str = "Progress saved"):
atomically_update_manifest(args.manifest, list(by_key.values()))
log(f"{prefix}: {completed}/{len(work)} (migrated={migrated}, skipped={skipped}, errors={errored})")
def _target_for_row(r: Dict) -> Dict[str, str]:
if manifest_has_target:
if r.get("Target_Algo") == "aws:kms":
return {"algo": "aws:kms", "kms_key_id": _kms_key_id(r.get("Target_KMSKeyId", ""))}
return {"algo": "AES256"}
# Fallback to CLI arg or bucket default (legacy behavior)
if getattr(args, "kms_key_arn", None):
return {"algo": "aws:kms", "kms_key_id": _kms_key_id(args.kms_key_arn)}
return read_bucket_target_policy(s3, r["Bucket"])
# Verify mode
if getattr(args, "verify", False):
verify_mode = {"mode": "all", "sample": 0.0}
elif float(getattr(args, "verify_sample", 0.0) or 0.0) > 0.0:
verify_mode = {"mode": "sample", "sample": float(args.verify_sample)}
else:
verify_mode = {"mode": "none", "sample": 0.0}
limiter = RateLimiter(float(getattr(args, "max_rps", 0.0) or 0.0))
try:
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futures = []
for r in work:
if _stop_requested:
break
futures.append(ex.submit(_migrate_one, s3, dict(r), _target_for_row(r),
args.max_retries, verify_mode, limiter))
for fut in as_completed(futures):
try:
status, updated_row = fut.result()
except Exception as e:
status, updated_row = "ERROR", {
"Bucket": "?", "Key": "?", "Status": "ERROR",
"LastError": str(e), "UpdatedAt": utcnow(), "DriftStatus": "DRIFTED"
}
# merge result
bk = (updated_row["Bucket"], updated_row["Key"])
by_key[bk] = updated_row
completed += 1
if status == "MIGRATED":
migrated += 1
elif status == "SKIPPED":
skipped += 1
else:
errored += 1
if (completed % args.progress_every == 0) or _stop_requested:
_save_progress("Progress saved")
if _stop_requested:
log("Stop flag observed; exiting after saving progress.")
break
except KeyboardInterrupt:
_save_progress("Progress saved (interrupt)")
raise
finally:
_save_progress("Final save")
log(
f"Done. Processed {completed}/{len(work)} "
f"(migrated={migrated}, skipped={skipped}, errors={errored})."
)
# ---------------------------
# Main / CLI
# ---------------------------
def main():
p = argparse.ArgumentParser(
description="S3 encryption migration (SSE-S3 -> SSE-KMS) with drift-aware manifest, resume & verification."
)
sub = p.add_subparsers(dest="cmd", required=True)
# manifest
sp = sub.add_parser("manifest", help="Create a manifest CSV of current objects + SSE/KMS info, including drift.")
sp.add_argument("--bucket", required=True)
sp.add_argument("--region", default=None)
sp.add_argument("--prefix", default="", help="Optional prefix to restrict scanning.")
sp.add_argument("--out", required=True)
sp.add_argument("--create-bucket", action="store_true", help="Create bucket if missing and set SSE-S3.")
sp.add_argument("--kms-key-arn", default=None, help="If provided, compute drift against this KMS key; else use bucket default (or AES256 if none).")
sp.add_argument("--resume", action="store_true", help="Merge into existing manifest instead of re-appending duplicates.")
sp.add_argument("--refresh-existing", action="store_true", help="When used with --resume, refresh Pre_* and DriftStatus for keys already in the manifest.")
# Tier-1 knobs (#2, #7)
sp.add_argument("--head-max-rps", type=float, default=0.0,
help="Max HEAD requests per second during manifest build (0 = unlimited).")
sp.add_argument("--head-workers", type=int, default=0,
help="Override parallel HEAD worker count during manifest (0 = auto).")
sp.set_defaults(func=cmd_manifest)
# ensure-bucket-encryption
sp = sub.add_parser("ensure-bucket-encryption", help="Ensure bucket default encryption (SSE-S3 or SSE-KMS).")
sp.add_argument("--bucket", required=True)
sp.add_argument("--region", default=None)
sp.add_argument("--kms-key-arn", default=None, help="If provided, set default to SSE-KMS with this key; otherwise use SSE-S3.")
sp.add_argument("--no-bucket-key", action="store_true", help="Disable S3 Bucket Keys when using SSE-KMS.")
sp.set_defaults(func=cmd_ensure_bucket_encryption)
# migrate
sp = sub.add_parser("migrate", help="Re-encrypt objects in place using the manifest’s target and drift snapshot (resumable).")
sp.add_argument("--region", default=None)
sp.add_argument("--manifest", required=True)
sp.add_argument("--kms-key-arn", default=None)
sp.add_argument("--workers", type=int, default=32)
sp.add_argument("--max-retries", type=int, default=7)
sp.add_argument("--progress-every", type=int, default=1000, help="Write manifest progress every N completions.")
sp.add_argument("--only-pending", action="store_true", help="Process only rows with Status in '', PENDING (ignores DriftStatus entirely).")
sp.add_argument("--verify", action="store_true", help="HEAD-verify after copy for every object (slower, safer).")
sp.add_argument("--verify-sample", type=float, default=0.0, help="Fraction (0..1) to verify randomly after copy (e.g., 0.02).")
sp.add_argument("--max-rps", type=float, default=0.0, help="Max copy() operations per second (token bucket). 0 disables limiting.")
sp.set_defaults(func=cmd_migrate)
args = p.parse_args()
args.func(args)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(130)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Multi-bucket S3 encryption planner
- Lists region buckets (via s3r.list_buckets_in_region)
- Builds/refreshes per-bucket *drift-aware* manifests using s3r.cmd_manifest (delegated)
- Records explicit Target_*:
* --kms-key-arn -> SSE-KMS with specific key (forced)
* (omitted) -> force reset to SSE-S3 (AES256) by post-adjusting the manifest
- Ctrl+C responsive loops
- KMS preflight (s3r.kms_is_usable) to skip buckets before migrate
- Invokes s3r.cmd_migrate per chosen bucket
"""
from __future__ import annotations
import argparse
import os
import sys
import textwrap
import signal
from typing import Dict, List, Optional, Tuple
from botocore.exceptions import ClientError
try:
import s3_reencrypt as s3r
except Exception as e:
print("ERROR: Could not import s3_reencrypt.py. Ensure it's alongside this file or on PYTHONPATH.")
print(f"Import error: {e}")
sys.exit(2)
# ---------------------------
# Ctrl+C handling
# ---------------------------
_STOP = False
_SIGINT_COUNT = 0
def _on_sigint(sig, frame):
global _STOP, _SIGINT_COUNT
_SIGINT_COUNT += 1
if _SIGINT_COUNT == 1:
_STOP = True
print(f"[{s3r.utcnow()}] Stop requested (Ctrl+C). Finishing in-flight tasks and saving progress... (press Ctrl+C again to force abort)", flush=True)
elif _SIGINT_COUNT == 2:
print(f"[{s3r.utcnow()}] Force abort requested. Raising KeyboardInterrupt...", flush=True)
raise KeyboardInterrupt
else:
print(f"[{s3r.utcnow()}] Hard abort. Exiting immediately.", flush=True)
os._exit(130)
signal.signal(signal.SIGINT, _on_sigint)
signal.siginterrupt(signal.SIGINT, True) # let KeyboardInterrupt break blocking calls
# ---------------------------
# Manifest building (delegates to s3r.cmd_manifest)
# ---------------------------
def build_manifest_for_bucket(
s3,
bucket: str,
manifest_path: str,
target: Dict[str, str],
prefix: str = "",
workers: int = 32, # unused in this delegated implementation (kept for CLI compatibility)
progress_every: int = 2000, # unused in this delegated implementation (kept for CLI compatibility)
resume: bool = False,
) -> Tuple[int, int]:
"""
Build/refresh a manifest for `bucket` by delegating to s3r.cmd_manifest (which already supports resume/upsert).
Behavior:
- If target["algo"] == "aws:kms": pass --kms-key-arn to cmd_manifest so that Target_* is set to that key.
- If target["algo"] == "AES256": call cmd_manifest WITHOUT --kms-key-arn, then FORCE Target_* to AES256
in a post-pass to ensure an explicit target snapshot regardless of the bucket default.
Returns: (total_objects_in_manifest, drifted_count)
"""
# Ensure header exists for first run
s3r.write_manifest_header(manifest_path)
class _Args:
pass
args = _Args()
args.bucket = bucket
args.region = getattr(s3.meta, "region_name", None) or "us-east-1"
args.prefix = prefix or ""
args.out = manifest_path
args.create_bucket = False
args.resume = resume
# When resuming, don't refresh
args.refresh_existing = not resume
# If desired target is KMS, pass the key id/arn through to cmd_manifest
if target.get("algo") == "aws:kms":
# cmd_manifest will normalize ARN or key-id via _kms_key_id
args.kms_key_arn = target.get("kms_key_id")
else:
# We want to *force* AES256 as explicit Target_*, regardless of bucket default.
# cmd_manifest has no flag for that, so we:
# 1) run cmd_manifest without kms-key (uses bucket default or AES256 if none)
# 2) post-adjust all rows' Target_* to AES256 and recompute DriftStatus from Pre_*.
args.kms_key_arn = None
# Run the delegated manifest command (handles true upsert, dedupe, atomic saves)
try:
s3r.cmd_manifest(args)
except KeyboardInterrupt:
print(f"[{s3r.utcnow()}] Stop observed during manifest build for {bucket}. Partial progress preserved.")
raise # bubble up to stop the whole run cleanly
# Post-adjust: if target is AES256, ensure explicit Target_* = AES256 and recompute DriftStatus
if target.get("algo") == "AES256":
try:
rows = s3r.load_manifest(manifest_path)
except Exception:
rows = []
forced_target = {"algo": "AES256"}
for r in rows:
r["Target_Algo"] = "AES256"
r["Target_KMSKeyId"] = ""
# recompute drift from Pre_* snapshot against AES256
pre = {
"ServerSideEncryption": r.get("Pre_ServerSideEncryption", ""),
"SSEKMSKeyId": r.get("Pre_SSEKMSKeyId", ""),
}
r["DriftStatus"] = "DRIFTED" if s3r.needs_migration(pre, forced_target) else "IN_SYNC"
r["UpdatedAt"] = s3r.utcnow()
s3r.atomically_update_manifest(manifest_path, rows)
# Summarize for return/logging
rows = s3r.load_manifest(manifest_path)
total = len(rows)
drifted = sum(1 for r in rows if r.get("DriftStatus") == "DRIFTED")
print(f"[{s3r.utcnow()}] {bucket}: Manifest written: {manifest_path} (total {total}, drifted {drifted})")
return total, drifted
# ---------------------------
# Summaries & UI
# ---------------------------
def summarize_manifests(manifest_paths: Dict[str, str]) -> Dict[str, Dict[str, int]]:
"""
Returns per-bucket summary:
{ bucket: {"total": N, "drifted": M, "errors": E, "in_sync": K} }
"""
summary: Dict[str, Dict[str, int]] = {}
for bucket, path in manifest_paths.items():
rows = s3r.load_manifest(path)
total = len(rows)
drifted = sum(1 for r in rows if r.get("DriftStatus") == "DRIFTED")
errors = sum(1 for r in rows if r.get("Status") == "ERROR" or r.get("DriftStatus") == "UNKNOWN")
in_sync = total - drifted
summary[bucket] = {"total": total, "drifted": drifted, "errors": errors, "in_sync": in_sync}
return summary
def human_after(target: Dict[str, str]) -> str:
return "SSE-S3 (AES256)" if target.get("algo") == "AES256" else f"SSE-KMS (key-id={target.get('kms_key_id')})"
def print_plan_table(summary: Dict[str, Dict[str, int]], after_desc_by_bucket: Dict[str, str]):
headers = ["#", "Bucket", "Total", "Drifted", "Errors/Unknown", "After"]
rows: List[List[str]] = []
for i, (bucket, stats) in enumerate(sorted(summary.items(), key=lambda x: x[0]), start=1):
rows.append([
str(i),
bucket,
str(stats["total"]),
str(stats["drifted"]),
str(stats["errors"]),
after_desc_by_bucket.get(bucket, "?"),
])
colw = [max(len(h), *(len(r[idx]) for r in rows)) for idx, h in enumerate(headers)]
fmt = " ".join("{:<" + str(w) + "}" for w in colw)
print()
print(fmt.format(*headers))
print(fmt.format(*["-" * w for w in colw]))
for r in rows:
print(fmt.format(*r))
print()
def ask_selection(buckets: List[str]) -> List[str]:
print("Select buckets to migrate:")
for idx, b in enumerate(buckets, start=1):
print(f" {idx}) {b}")
print("Enter numbers (comma-separated), ranges (e.g., 1-3), or '*' for all. Example: 1,3,5-7")
try:
raw = input("> ").strip()
except KeyboardInterrupt:
print("\nAborted by user.")
return []
if raw == "*":
return buckets[:]
chosen: List[str] = []
parts = [p.strip() for p in raw.split(",") if p.strip()]
idx_map = {str(i): b for i, b in enumerate(buckets, start=1)}
for p in parts:
if "-" in p:
a, b = p.split("-", 1)
try:
start = int(a); end = int(b)
for i in range(start, end + 1):
key = str(i)
if key in idx_map and idx_map[key] not in chosen:
chosen.append(idx_map[key])
except ValueError:
pass
else:
if p in idx_map and idx_map[p] not in chosen:
chosen.append(idx_map[p])
return chosen
def confirm(prompt: str) -> bool:
try:
ans = input(f"{prompt} [y/N]: ").strip().lower()
except KeyboardInterrupt:
print("\nAborted by user.")
return False
return ans in ("y", "yes")
# ---------------------------
# Migration launcher
# ---------------------------
def run_migrations(region: Optional[str],
manifest_paths: Dict[str, str],
target_by_bucket: Dict[str, Dict[str, str]]):
"""
For each selected bucket's manifest, run migrate only if the KMS target is usable (when applicable).
"""
for bucket, path in manifest_paths.items():
target = target_by_bucket.get(bucket, {}) or {}
if target.get("algo") == "aws:kms":
key_id = target.get("kms_key_id")
if not s3r.kms_is_usable(region or "us-east-1", key_id):
print(f"[{s3r.utcnow()}] Skipping {bucket}: KMS key not usable in {region} (or insufficient permissions).")
continue
print(f"\n=== Migrating bucket: {bucket} ===")
class _Args: # simple namespace mirroring s3r CLI
pass
args = _Args()
args.region = region
args.manifest = path
args.kms_key_arn = None # manifest Target_* drives the per-row policy
args.workers = 16
args.max_retries = 3
args.progress_every = 500
args.only_pending = False
try:
s3r.cmd_migrate(args)
except ClientError as e:
code = (e.response or {}).get("Error", {}).get("Code", "Unknown")
if code.startswith("KMS.") or code in ("AccessDeniedException", "ValidationException"):
print(f"[{s3r.utcnow()}] Skipping {bucket} due to KMS error: {code}")
continue
raise
print(f"=== Completed: {bucket} ===\n")
# ---------------------------
# Main
# ---------------------------
def main():
p = argparse.ArgumentParser(
description="Plan and migrate SSE across multiple S3 buckets using drift-aware manifests."
)
p.add_argument("--region", required=True, help="Region to scope buckets to (e.g., us-east-1).")
p.add_argument("--exclude-substr", action="append", default=[],
help="Exclude buckets whose name contains this substring. Can be given multiple times.")
p.add_argument("--kms-key-arn", default=None,
help="If provided, plan/migrate to this KMS key (SSE-KMS). If omitted, plan to reset to SSE-S3 (AES256).")
p.add_argument("--manifests-dir", default="data/manifests",
help="Directory to store per-bucket manifest CSVs.")
p.add_argument("--prefix", default="", help="Optional prefix filter for object listing.")
p.add_argument("--ignore-default-sse-s3", action="store_true",
help="Exclude buckets whose bucket default encryption is SSE-S3 (AES256).")
p.add_argument("--resume", action="store_true",
help="Upsert into existing per-bucket manifests; safe to cancel & rerun (final rows retained).")
p.add_argument("--manifest-workers", type=int, default=32,
help="(Unused in delegated mode) kept for CLI compatibility.")
args = p.parse_args()
# S3 client via companion (bigger conn pool helps for HEAD fanout if needed elsewhere)
s3 = s3r.s3_client(args.region, max_pool_connections=256)
# Bucket discovery + filtering (via companion helpers)
all_buckets = s3r.list_buckets_in_region(s3, args.region)
print(f"[{s3r.utcnow()}] Found {len(all_buckets)} buckets in region {args.region}")
excludes = [x.lower() for x in args.exclude_substr]
buckets: List[str] = []
for b in all_buckets:
if _STOP:
print(f"[{s3r.utcnow()}] Stop observed; exiting before scanning manifests.")
return
low = b.lower()
if any(ex in low for ex in excludes):
continue
if not s3r.bucket_exists(s3, b):
continue
if args.ignore_default_sse_s3:
is_aes256 = s3r.bucket_default_is_sse_s3(s3, b)
if is_aes256 is True:
print(f"[{s3r.utcnow()}] Skipping {b}: default encryption is already SSE-S3 (AES256).")
continue
buckets.append(b)
if not buckets:
print("No candidate buckets after filtering.")
return
# Choose explicit per-bucket target (force policy snapshot in manifest)
if args.kms_key_arn:
target_template = s3r.kms_target(args.kms_key_arn)
else:
target_template = s3r.aes256_target()
manifests: Dict[str, str] = {}
after_desc_by_bucket: Dict[str, str] = {}
target_by_bucket: Dict[str, Dict[str, str]] = {}
for b in buckets:
if _STOP:
print(f"[{s3r.utcnow()}] Stop observed; exiting before processing bucket {b}.")
return
target = dict(target_template) # same policy for all; copy for clarity
target_by_bucket[b] = target
after_desc_by_bucket[b] = "SSE-S3 (AES256)" if target["algo"] == "AES256" else f"SSE-KMS (key-id={target['kms_key_id']})"
manifest_path = os.path.join(args.manifests_dir, f"{b}.csv")
if not args.resume and os.path.exists(manifest_path):
os.remove(manifest_path)
total, drifted = build_manifest_for_bucket(
s3, b, manifest_path, target,
prefix=args.prefix, workers=args.manifest_workers, progress_every=2000, resume=args.resume
)
manifests[b] = manifest_path
# Summarize & present plan
summary = summarize_manifests(manifests)
print_plan_table(summary, after_desc_by_bucket)
drift_buckets = [b for b, s in summary.items() if s["drifted"] > 0]
if not drift_buckets:
print("No drift detected. Nothing to migrate.")
return
print(textwrap.dedent("""
The buckets above show how many objects differ from the target:
Target after migration: (varies per bucket; see 'After' column)
Tip: You usually only want to migrate buckets with DRIFTED > 0.
""").strip())
chosen = ask_selection([b for b in buckets if summary[b]["drifted"] > 0])
if not chosen:
print("No buckets selected. Exiting.")
return
print("\nYou chose to migrate these buckets:")
for b in chosen:
stats = summary[b]
print(f" - {b}: {stats['drifted']} / {stats['total']} objects will be updated → {after_desc_by_bucket[b]}")
if not confirm("Proceed with migration?"):
print("Aborted by user.")
return
# KMS preflight + run migrations
run_migrations(args.region, {b: manifests[b] for b in chosen}, target_by_bucket)
print("\nAll selected migrations completed.")
print("Reminder: If you were moving to SSE-KMS, consider setting the bucket default encryption accordingly:")
print(f" python s3_reencrypt.py ensure-bucket-encryption --bucket <bucket> --region {args.region} --kms-key-arn <key-arn>")
print("Or to reset to S3-managed (AES256):")
print(f" python s3_reencrypt.py ensure-bucket-encryption --bucket <bucket> --region {args.region}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
# exit code 130 is conventional for SIGINT
sys.exit(130)
@filipeandre
Copy link
Author

filipeandre commented Oct 13, 2025

REPO_FOLDER=s3_reencrypt && \
REPO_URL=https://gist.github.com/filipeandre/8c08e4bc6faff62cedf1a0217705a584.git && \
[ -d .git ] && git pull --rebase || { [ -d $REPO_FOLDER/.git ] && git -C $REPO_FOLDER pull --rebase || git clone $REPO_URL $REPO_FOLDER; cd $REPO_FOLDER 2>/dev/null || true; }  && \
python s3_reencrypt_plan.py \
  --region "$AWS_REGION" \
  --exclude-substr logs \
  --exclude-substr cloudtrail \
  --ignore-default-sse-s3 \
  --manifests-dir data/manifests \
  --resume

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment