Last active
August 9, 2022 14:21
-
-
Save jnhmcknight/be52e07cec5e12861cd7c70552f5b18b to your computer and use it in GitHub Desktop.
S3 Bucket Point-In-Time Snapshot Creation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This requires that you have versioning enabled on the S3 bucket. Without that, this script cannot do anything useful. | |
Install Dependencies: `pip3 install boto3 click dateparser pytz` | |
Run it: `python3 ./s3.py bucket-snapshot [OPTIONS] BUCKET-NAME UTC-DATE-TIME DESTINATION-FOLDER` | |
""" | |
import json | |
import os | |
import boto3 | |
import click | |
import dateparser | |
import pytz | |
S3 = boto3.client('s3') | |
def _get_bucket_versions(s3_bucket, prefix=None): | |
paginator = S3.get_paginator('list_object_versions') | |
version_kwargs = { | |
'Bucket': s3_bucket, | |
} | |
if prefix is not None: | |
version_kwargs.update({ | |
'Prefix': prefix, | |
}) | |
count = 0 | |
versions = [] | |
for page in paginator.paginate(**version_kwargs): | |
if page.get('Versions'): | |
versions.extend(page['Versions']) | |
return versions | |
@click.group() | |
def cli(): | |
pass | |
@cli.command() | |
@click.argument('s3-bucket') | |
@click.argument('date') | |
@click.argument('destination') | |
@click.option( | |
'-s', '--start-date', | |
type=str, | |
help='Version must have been created after this date', | |
default=None, | |
) | |
@click.option( | |
'-p', '--prefix', | |
type=str, | |
help='Limit the snapshot to only files with this prefix', | |
default=None, | |
) | |
@click.option( | |
'-e', '--extension', | |
multiple=True, | |
type=str, | |
help='Limit snapshot to only this file extension', | |
default=None, | |
) | |
@click.option('--overwrite', is_flag=True, default=False, help='Overwrite existing files') | |
def bucket_snapshot(s3_bucket, date, destination, start_date, prefix, extension, overwrite): | |
"""Download a bucket as of the given date""" | |
end_date = dateparser.parse(date).replace(tzinfo=pytz.UTC) | |
if start_date is not None: | |
start_date = dateparser.parse(start_date).replace(tzinfo=pytz.UTC) | |
if end_date < start_date: | |
click.echo('`date` MUST be later than `start_date`') | |
return click.Abort() | |
click.echo(f'Provided date span is {start_date.isoformat()} - {end_date.isoformat()}') | |
else: | |
click.echo(f'Provided date was parsed to: {end_date.isoformat()}') | |
if not click.confirm('Proceed?'): | |
return click.Abort() | |
count = 0 | |
versions = {} | |
for version in _get_bucket_versions(s3_bucket, prefix): | |
count += 1 | |
if count % 100 == 0: | |
click.echo(f'Processed {count} versions...') | |
if version['LastModified'] > end_date: | |
continue | |
elif start_date is not None and version['LastModified'] < start_date: | |
continue | |
if extension and not version['Key'].endswith(extension): | |
continue | |
if version['Key'] not in versions: | |
versions.update({version['Key']: { | |
'date': version['LastModified'], | |
'id': version['VersionId'], | |
}}) | |
elif version['LastModified'] > versions[version['Key']]['date']: | |
versions[version['Key']] = { | |
'date': version['LastModified'], | |
'id': version['VersionId'], | |
} | |
click.echo('Downloading versions for snapshot...') | |
if not os.path.isdir(destination): | |
# os.makedirs will create all intermediate paths without complaining, | |
# whereas os.mkdir will bomb if there are intermediate dirs that do not exist | |
os.makedirs(destination) | |
count = 0 | |
for key,version in versions.items(): | |
count += 1 | |
output_path = os.path.join(destination, key) | |
if os.path.isfile(output_path) and not overwrite: | |
raise Exception(f'Output path exists and overwrite was not provided: {output_path}') | |
obj = S3.get_object( | |
Bucket=s3_bucket, | |
Key=key, | |
VersionId=version['id'], | |
) | |
keydir = os.path.dirname(key) | |
if keydir: | |
keydir = os.path.join(destination, keydir) | |
if not os.path.isdir(keydir): | |
os.makedirs(keydir) | |
with open(output_path, 'wb') as destfile: | |
destfile.write(obj['Body'].read()) | |
if count % 50 == 0: | |
click.echo(f'Downloaded {count} versions...') | |
click.echo(f'Completed. {len(versions.keys())} files downloaded as a snapshot of {s3_bucket} at {end_date.isoformat()}') | |
if __name__ == '__main__': | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment