Skip to content

Instantly share code, notes, and snippets.

@heykarimoff
Created October 24, 2019 04:52

Revisions

  1. heykarimoff created this gist Oct 24, 2019.
    56 changes: 56 additions & 0 deletions recover_bucket.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,56 @@
    #!/usr/bin/env python
    from datetime import datetime, timezone

    import boto3

    # ######################################
    #
    # Empty Bucket of all delete markers from all objects.
    #
    # ######################################

    # -----------------------------------
    # Enter these values here:
    thebucket = "<bucket_name>"
    access_key = "<access_key>"
    secret_key = "<secret_key>"
    region_name = "<region_name>" # us-east-1

    # ------------------------------------

    s3 = boto3.resource("s3", region_name=region_name, aws_access_key_id=access_key, aws_secret_access_key=secret_key)
    s3client = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    # paginate 100000 at a time
    page_size = 100000
    folder_in_thebucket = "files/invoices"
    paginator = s3client.get_paginator("list_object_versions")
    pageresponse = paginator.paginate(
    Bucket=thebucket, Prefix=folder_in_thebucket, PaginationConfig={"MaxItems": page_size}
    )
    deleted_at = datetime(2019, 10, 22, 20, 0, 0, tzinfo=timezone.utc)


    def restore_all(pages):
    # iter over the pages from the paginator
    for page in pages:
    # Find if there are any delmarkers
    if "DeleteMarkers" in page.keys():
    for each_delmarker in page["DeleteMarkers"]:
    if each_delmarker["IsLatest"] is True and each_delmarker["LastModified"] > deleted_at:
    restore(each_delmarker)


    def restore(delete_marker):
    # Create a resource for the version-object
    # and use .delete() to remove it.
    file_object_version = s3.ObjectVersion(thebucket, delete_marker["Key"], delete_marker["VersionId"])
    # I added this output just so I could watch the script run.
    print(f"Restoring {delete_marker}")
    # Lastly, lets remove the del marker and recover one of many files.
    file_object_version.delete()


    if __name__ == "__main__":
    print(f"Restoring files deleted after {deleted_at} in {thebucket}/{folder_in_thebucket}.")
    restore_all(pageresponse)