Skip to content

Instantly share code, notes, and snippets.

@timsavage
Last active August 25, 2020 23:55
Show Gist options
  • Save timsavage/141318ee83342065216aea0efa3b6432 to your computer and use it in GitHub Desktop.
Save timsavage/141318ee83342065216aea0efa3b6432 to your computer and use it in GitHub Desktop.
Script for syncing static websites to S3.
#! /usr/bin/env python3
"""
Script for syncing static websites to S3. Requires Python 3.6+
- Uses ETags to determine if a file has changed.
- Cleans up remote files.
- Sets public ACLs
- Supplies the correct content-type
- Uses GZip encoding for appropriate file types.
"""
import argparse
import gzip
import hashlib
import mimetypes
import shutil
from io import BytesIO
from pathlib import Path
from botocore.session import Session
GZIP_EXTENSIONS = {".html", ".css", ".js", ".ico"}
CACHE_CONTROL = {".html": 86400, ".css": 604800, ".js": 86400, ".ico": 86400}
DEFAULT_CACHE_CONTROL = 604800
def main():
opts = build_args()
session = Session(profile=opts.profile)
client = session.create_client("s3")
# Determine what work is required
remote_list = list_remote_files(client, opts.BUCKET)
upload_files, delete_files = files_to_upload(opts.SOURCE, remote_list, force=opts.force)
print("\nSyncing...")
# Apply to S3
upload_to_s3(client, upload_files, opts.BUCKET, GZIP_EXTENSIONS,
CACHE_CONTROL, opts.acl, dry_run=opts.dry_run)
delete_from_s3(client, opts.BUCKET, delete_files, dry_run=opts.dry_run)
def build_args():
parser = argparse.ArgumentParser()
parser.add_argument("SOURCE", type=Path, help="Location of files")
parser.add_argument("BUCKET", help="Name of S3 bucket")
parser.add_argument("--acl", default="public-read", choices=("private", "public-read"),
help="ACL applied to each file")
parser.add_argument("--profile", help="AWS profile name")
parser.add_argument("--force", action="store_true", help="Force upload of all files")
parser.add_argument("--dry-run", action="store_true", help="Report changes buy don't apply them")
return parser.parse_args()
def create_etag(file, bs=64 * 1024):
h = hashlib.md5()
with file.open("rb") as f:
while True:
buf = f.read(bs)
if buf:
h.update(buf)
else:
break
return h.hexdigest()
def list_remote_files(client, bucket):
"""
Return of dict keyed off the path, with etag and size values.
"""
remote_files = {}
for entry in client.list_objects_v2(Bucket=bucket)["Contents"]:
path = Path(entry["Key"])
remote_files[path] = (entry["ETag"].strip('"'), entry["Size"])
return remote_files
def files_to_upload(base_path, remote_files, *, ignore_dot_files=True, force=False):
"""
Identify local files that need to be uploaded
Files that are to be uploaded are removed from remote files dict.
"""
base_path = Path(base_path)
upload_files = []
for path in base_path.glob("**/*.*"):
if ignore_dot_files and path.name.startswith("."):
continue
remote_path = path.relative_to(base_path)
try:
etag, size = remote_files.pop(remote_path)
except KeyError:
print(f"+ {path}")
upload_files.append((path, remote_path))
else:
if force:
print(f"! {path}")
upload_files.append((path, remote_path))
continue
# File size differs
local_size = path.stat().st_size
if size != local_size:
print(f"~ {path}; size {size} != {local_size}")
upload_files.append((path, remote_path))
continue
# Etag differs
local_etag = create_etag(path)
if etag != local_etag:
print(f"~ {path}; etag {etag} != {local_etag}")
upload_files.append((path, remote_path))
continue
print(f"= {path}")
return upload_files, list(remote_files.keys())
def upload_to_s3(client, upload_files, bucket, gzip_extensions, cache_control,
acl, default_cache_control=DEFAULT_CACHE_CONTROL, dry_run=False):
"""
Upload changed files to S3
"""
common_put_args = {"ACL": acl, "Bucket": bucket}
for source, dest in upload_files:
put_args = common_put_args.copy()
# Get caching time
max_age = cache_control.get(source.suffix, default_cache_control)
put_args["CacheControl"] = f"max-age={max_age}"
# Determine content type
content_type, _ = mimetypes.guess_type(source.as_posix())
if content_type:
put_args["ContentType"] = content_type
with source.open("rb") as f:
if source.suffix in gzip_extensions:
rf = f
put_args["ContentEncoding"] = "gzip"
f = BytesIO()
with gzip.GzipFile(fileobj=f, mode="wb") as gz:
shutil.copyfileobj(rf, gz)
f.seek(0)
print("Copy", source, "to", f"s3://{bucket}/{dest.as_posix()}")
if dry_run:
print(f" > Put Object: Key={dest.as_posix()!r};", "; ".join(f"{k}={v!r}" for k, v in put_args.items()))
else:
response = client.put_object(Body=f, Key=dest.as_posix(), **put_args)
print(f" > Response: ETag={response['ETag']}")
def delete_from_s3(client, bucket, delete_files, dry_run=False):
"""
Files to remove from S3
"""
for file in delete_files:
print(f"Delete s3://{bucket}/{file.as_posix()}")
if dry_run:
print(f" > Delete Object: Key={file.as_posix()!r}; Bucket={bucket!r}")
else:
client.delete_object(Bucket=bucket, Key=file.as_posix())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment