Created
January 17, 2024 19:03
-
-
Save clbarnes/a7367ca0ccc877af951c3726e20a5826 to your computer and use it in GitHub Desktop.
Given a manifest file, copy a directory tree from a URL base to a local directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Copy a directory tree of files over HTTPS. | |
If HTTP basic auth is required, use an environment variable like | |
`HTTP_BASIC_AUTH="myuser:mypassword"`. | |
""" | |
import os | |
import sys | |
from urllib.request import Request, urlopen | |
from base64 import b64encode | |
import typing as tp | |
from pathlib import Path | |
from concurrent.futures import ThreadPoolExecutor | |
import logging | |
from time import perf_counter | |
from argparse import ArgumentParser | |
logger = logging.getLogger(__name__) | |
def make_headers(basic_auth: tp.Optional[str]) -> dict[str, str]: | |
out = { | |
"User-Agent": "Mozilla/5.0 (X11; Linux i686; rv:109.0) Gecko/20100101 Firefox/121.0", | |
} | |
if basic_auth is not None: | |
value = b"Basic " + b64encode(basic_auth.encode()) | |
out["Authorization"] = value.decode() | |
return out | |
def ensure_dirs(root: Path, fpaths: list[str]): | |
logging.info("Creating directories") | |
visited = set() | |
for fpath in fpaths: | |
fpath = fpath.lstrip("/") | |
dpath = (root / fpath).parent | |
if dpath in visited: | |
continue | |
dpath.mkdir(exist_ok=True, parents=True) | |
visited.add(dpath) | |
def fmt_seconds(s: float): | |
mins, remaining_s = divmod(s, 60) | |
hrs, remaining_min = divmod(mins, 60) | |
return f"{int(hrs):02}:{int(remaining_min):02}:{int(remaining_s):02}" | |
def main(url_base: str, root: Path, paths: list[str], threads: int, basic_auth: tp.Optional[str]): | |
url_base = url_base.rstrip("/") | |
ensure_dirs(root, paths) | |
headers = make_headers(basic_auth) | |
def fn(path: str): | |
p = path.strip("/") | |
url = f"{url_base}/{p}" | |
req = Request(url, headers=headers) | |
outpath = root / p | |
try: | |
resp = urlopen(req) | |
bts = resp.read() | |
length = len(bts) | |
outpath.write_bytes(bts) | |
except Exception: | |
logger.exception("Could not transfer from %s to %s", url, outpath) | |
length = None | |
return (url, outpath, length) | |
n_paths = len(paths) | |
n_digs = len(str(n_paths)) | |
logging.info("Starting pool of %s threads", threads) | |
with ThreadPoolExecutor(threads) as exe: | |
count = 0 | |
total = 0 | |
start = perf_counter() | |
for res in exe.map(fn, paths): | |
url, outpath, length = res | |
elapsed = perf_counter() - start | |
count += 1 | |
if length is None: | |
continue | |
logger.debug("Transferred %sB from %s to %s", length, url, outpath) | |
total += int(length) | |
remaining = elapsed / (count / n_paths) - elapsed | |
logger.info( | |
"Transferred %s of %s files after %s, estimated %s remaining", | |
f"{count:{n_digs}}", | |
n_paths, | |
fmt_seconds(elapsed), | |
fmt_seconds(remaining) | |
) | |
logger.info("Transferred %sB", total) | |
def read_filelike(f): | |
for line in f: | |
stripped = line.strip() | |
if stripped: | |
yield stripped | |
def read_manifest(s: str): | |
logging.info("Reading manifest") | |
if s is None or s == "-": | |
return list(read_filelike(sys.stdin)) | |
else: | |
with open(s) as f: | |
return list(read_filelike(f)) | |
if __name__ == "__main__": | |
ap = ArgumentParser(description=__doc__) | |
ap.add_argument("urlbase", help="Base of URL to append paths to") | |
ap.add_argument("outdir", type=Path, help="Root directory to copy files into") | |
ap.add_argument("manifest", nargs="?", help="Path to manifest file; empty or `-` to read from stdin") | |
ap.add_argument("--jobs", "-j", type=int, default=5, help="How many threads to use, default 5") | |
ap.add_argument("--verbose", "-v", action="count", default=0, help="Increase verbosity") | |
parsed = ap.parse_args() | |
lvl = {0: logging.WARN, 1: logging.INFO, 2: logging.DEBUG}.get(parsed.verbose, logging.DEBUG) | |
logging.basicConfig(level=lvl) | |
basic_auth = os.environ.get("HTTP_BASIC_AUTH") | |
paths = read_manifest(parsed.manifest) | |
main(parsed.urlbase, parsed.outdir, paths, parsed.jobs, basic_auth) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment