Created
November 11, 2023 15:50
-
-
Save vadviktor/2cc9fda0883d08472d4a8ce4151561be to your computer and use it in GitHub Desktop.
Restore objects from S3 Glacier Deep Archive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import boto3 | |
def main(): | |
# Parse command line arguments | |
parser = argparse.ArgumentParser( | |
description="Restore objects from S3 Glacier Deep Archive" | |
) | |
parser.add_argument( | |
"--path", | |
required=True, | |
help="The S3 path to restore (e.g. s3://mybucket/myfolder)", | |
) | |
parser.add_argument( | |
"--days", | |
type=int, | |
default=1, | |
help="The number of days to restore the objects for (Default: 1)", | |
) | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Only list the affected objects without restoring them", | |
) | |
args = parser.parse_args() | |
# Split the path into bucket and prefix | |
path_parts = args.path.replace("s3://", "").split("/", 1) | |
bucket = path_parts[0] | |
prefix = "" | |
if len(path_parts) > 1: | |
prefix = path_parts[1] | |
# Create a new S3 service client | |
s3 = boto3.client("s3") | |
# List objects in the specified path | |
paginator = s3.get_paginator("list_objects_v2") | |
pages = paginator.paginate(Bucket=bucket, Prefix=prefix) | |
for page in pages: | |
for obj in page["Contents"]: | |
if args.dry_run: | |
# Only list the affected objects | |
print(f"Would restore {obj['Key']}") | |
else: | |
meta = s3.head_object(Bucket=bucket, Key=obj["Key"]) | |
if meta["StorageClass"] == "DEEP_ARCHIVE": | |
if meta.get("Restore") is None: | |
print("Submitting restoration request: %s" % obj["Key"]) | |
s3.restore_object( | |
Bucket=bucket, | |
Key=obj["Key"], | |
RestoreRequest={ | |
"Days": args.days, | |
"GlacierJobParameters": {"Tier": "Bulk"}, | |
}, | |
) | |
elif 'ongoing-request="true"' in meta["Restore"]: | |
print("Restoration in-progress: %s" % obj["Key"]) | |
elif 'ongoing-request="false"' in meta["Restore"]: | |
print("Restoration complete: %s" % obj["Key"]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment