Last active
August 25, 2020 23:55
-
-
Save timsavage/141318ee83342065216aea0efa3b6432 to your computer and use it in GitHub Desktop.
Script for syncing static websites to S3.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
""" | |
Script for syncing static websites to S3. Requires Python 3.6+ | |
- Uses ETags to determine if a file has changed. | |
- Cleans up remote files. | |
- Sets public ACLs | |
- Supplies the correct content-type | |
- Uses GZip encoding for appropriate file types. | |
""" | |
import argparse | |
import gzip | |
import hashlib | |
import mimetypes | |
import shutil | |
from io import BytesIO | |
from pathlib import Path | |
from botocore.session import Session | |
GZIP_EXTENSIONS = {".html", ".css", ".js", ".ico"} | |
CACHE_CONTROL = {".html": 86400, ".css": 604800, ".js": 86400, ".ico": 86400} | |
DEFAULT_CACHE_CONTROL = 604800 | |
def main(): | |
opts = build_args() | |
session = Session(profile=opts.profile) | |
client = session.create_client("s3") | |
# Determine what work is required | |
remote_list = list_remote_files(client, opts.BUCKET) | |
upload_files, delete_files = files_to_upload(opts.SOURCE, remote_list, force=opts.force) | |
print("\nSyncing...") | |
# Apply to S3 | |
upload_to_s3(client, upload_files, opts.BUCKET, GZIP_EXTENSIONS, | |
CACHE_CONTROL, opts.acl, dry_run=opts.dry_run) | |
delete_from_s3(client, opts.BUCKET, delete_files, dry_run=opts.dry_run) | |
def build_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("SOURCE", type=Path, help="Location of files") | |
parser.add_argument("BUCKET", help="Name of S3 bucket") | |
parser.add_argument("--acl", default="public-read", choices=("private", "public-read"), | |
help="ACL applied to each file") | |
parser.add_argument("--profile", help="AWS profile name") | |
parser.add_argument("--force", action="store_true", help="Force upload of all files") | |
parser.add_argument("--dry-run", action="store_true", help="Report changes buy don't apply them") | |
return parser.parse_args() | |
def create_etag(file, bs=64 * 1024): | |
h = hashlib.md5() | |
with file.open("rb") as f: | |
while True: | |
buf = f.read(bs) | |
if buf: | |
h.update(buf) | |
else: | |
break | |
return h.hexdigest() | |
def list_remote_files(client, bucket): | |
""" | |
Return of dict keyed off the path, with etag and size values. | |
""" | |
remote_files = {} | |
for entry in client.list_objects_v2(Bucket=bucket)["Contents"]: | |
path = Path(entry["Key"]) | |
remote_files[path] = (entry["ETag"].strip('"'), entry["Size"]) | |
return remote_files | |
def files_to_upload(base_path, remote_files, *, ignore_dot_files=True, force=False): | |
""" | |
Identify local files that need to be uploaded | |
Files that are to be uploaded are removed from remote files dict. | |
""" | |
base_path = Path(base_path) | |
upload_files = [] | |
for path in base_path.glob("**/*.*"): | |
if ignore_dot_files and path.name.startswith("."): | |
continue | |
remote_path = path.relative_to(base_path) | |
try: | |
etag, size = remote_files.pop(remote_path) | |
except KeyError: | |
print(f"+ {path}") | |
upload_files.append((path, remote_path)) | |
else: | |
if force: | |
print(f"! {path}") | |
upload_files.append((path, remote_path)) | |
continue | |
# File size differs | |
local_size = path.stat().st_size | |
if size != local_size: | |
print(f"~ {path}; size {size} != {local_size}") | |
upload_files.append((path, remote_path)) | |
continue | |
# Etag differs | |
local_etag = create_etag(path) | |
if etag != local_etag: | |
print(f"~ {path}; etag {etag} != {local_etag}") | |
upload_files.append((path, remote_path)) | |
continue | |
print(f"= {path}") | |
return upload_files, list(remote_files.keys()) | |
def upload_to_s3(client, upload_files, bucket, gzip_extensions, cache_control, | |
acl, default_cache_control=DEFAULT_CACHE_CONTROL, dry_run=False): | |
""" | |
Upload changed files to S3 | |
""" | |
common_put_args = {"ACL": acl, "Bucket": bucket} | |
for source, dest in upload_files: | |
put_args = common_put_args.copy() | |
# Get caching time | |
max_age = cache_control.get(source.suffix, default_cache_control) | |
put_args["CacheControl"] = f"max-age={max_age}" | |
# Determine content type | |
content_type, _ = mimetypes.guess_type(source.as_posix()) | |
if content_type: | |
put_args["ContentType"] = content_type | |
with source.open("rb") as f: | |
if source.suffix in gzip_extensions: | |
rf = f | |
put_args["ContentEncoding"] = "gzip" | |
f = BytesIO() | |
with gzip.GzipFile(fileobj=f, mode="wb") as gz: | |
shutil.copyfileobj(rf, gz) | |
f.seek(0) | |
print("Copy", source, "to", f"s3://{bucket}/{dest.as_posix()}") | |
if dry_run: | |
print(f" > Put Object: Key={dest.as_posix()!r};", "; ".join(f"{k}={v!r}" for k, v in put_args.items())) | |
else: | |
response = client.put_object(Body=f, Key=dest.as_posix(), **put_args) | |
print(f" > Response: ETag={response['ETag']}") | |
def delete_from_s3(client, bucket, delete_files, dry_run=False): | |
""" | |
Files to remove from S3 | |
""" | |
for file in delete_files: | |
print(f"Delete s3://{bucket}/{file.as_posix()}") | |
if dry_run: | |
print(f" > Delete Object: Key={file.as_posix()!r}; Bucket={bucket!r}") | |
else: | |
client.delete_object(Bucket=bucket, Key=file.as_posix()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment