Last active
January 31, 2021 18:24
-
-
Save LewisGaul/65aa7d4ff5a20abb1c8ea02cd0948486 to your computer and use it in GitHub Desktop.
Fetch files from GitHub using the REST API, emulating 'git archive', which is not supported natively by GitHub.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Fetch files from GitHub using the REST API, emulating 'git archive', which is | |
not supported natively by GitHub. | |
Supports Python 3.6+, Linux. Only external dependency is the 'curl' executable. | |
API docs are at https://docs.github.com/en/rest, we use the following: | |
- Get repo info | |
GET /repos/{owner}/{repo} | |
https://docs.github.com/en/rest/reference/repos#get-a-repository | |
- Get path info | |
GET /repos/{owner}/{repo}/contents/{path} | |
https://docs.github.com/en/rest/reference/repos#get-repository-content | |
- Get a 'tree' (list directory contents) | |
GET /repos/{owner}/{repo}/git/trees/{tree_sha} | |
GET /repos/{owner}/{repo}/git/trees/{ref}:{path} | |
https://docs.github.com/en/rest/reference/git#get-a-tree | |
- Get a 'blob' (file contents) | |
GET /repos/{owner}/{repo}/git/blobs/{file_sha} | |
GET /repos/{owner}/{repo}/git/blobs/{ref}:{path} | |
https://docs.github.com/en/rest/reference/git#get-a-blob | |
Limitations: | |
- Does not set mode of directories, only files. | |
- Does not set mtime of files/dirs. | |
""" | |
import argparse | |
import asyncio | |
import base64 | |
import collections | |
import json | |
import logging | |
import os | |
import posixpath | |
import shlex | |
import subprocess | |
import sys | |
import tarfile | |
import tempfile | |
import urllib.parse | |
import zipfile | |
from typing import Coroutine, Dict, List, Mapping, Optional, Tuple, Union | |
logger = logging.getLogger("gh-archive") | |
Json = Union[str, int, bool, None, Dict[str, "Json"], List["Json"]] | |
JsonContainer = Union[Dict[str, Json], List[Json]] | |
BASE_URL: str | |
REPO_IDENT: str | |
USER: str | |
TOKEN: Optional[str] | |
File = collections.namedtuple("File", "path, sha, mode, mtime, contents") | |
# ------------------------------------------------------------------------------ | |
# Helpers | |
# ------------------------------------------------------------------------------ | |
async def _subproc(cmd, **kwargs) -> str: | |
""" | |
Run a subprocess command, exiting on failure (similar to bash 'set -e' | |
behaviour). | |
:param cmd: | |
Command to run, in list form. | |
:return: | |
Stdout from the command. | |
""" | |
cmd_str = " ".join(shlex.quote(x) for x in cmd) | |
logger.debug("Running command: %s", cmd_str) | |
proc = await asyncio.create_subprocess_exec( | |
*cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
**kwargs, | |
) | |
stdout, stderr = await proc.communicate() | |
if proc.returncode == 0: | |
return stdout.decode() | |
else: | |
logger.critical("Command failed: %s\n%s", cmd_str, stderr.decode()) | |
sys.exit(proc.returncode) | |
async def _run_curl(url: str) -> str: | |
cmd = ["curl", "-L", "--fail", url] | |
if TOKEN: | |
cmd.extend(("--user", f"{USER}:{TOKEN}")) | |
return await _subproc(cmd) | |
def _parse_file_mode(mode: str) -> int: | |
return int(mode, base=8) % 0o1000 | |
def _decode_file_contents(content: str) -> str: | |
return base64.b64decode(content).decode() | |
async def _fetch_repo_info() -> JsonContainer: | |
""" | |
GET /repos/{owner}/{repo} | |
https://docs.github.com/en/rest/reference/repos#get-a-repository | |
:return: | |
Decoded JSON from the Github API. | |
""" | |
url = posixpath.join( | |
BASE_URL, | |
"repos", | |
REPO_IDENT, | |
).rstrip("/") | |
output = await _run_curl(url) | |
return json.loads(output) | |
async def _fetch_path_info(path: str, ref: Optional[str] = None) -> JsonContainer: | |
""" | |
GET /repos/{owner}/{repo}/contents/{path} | |
https://docs.github.com/en/rest/reference/repos#get-repository-content | |
:param path: | |
The repo path to get info for. | |
:param ref: | |
Optionally specify a branch/tag/commit. | |
:return: | |
Decoded JSON from the Github API. | |
""" | |
url = posixpath.join(BASE_URL, "repos", REPO_IDENT, "contents", path).rstrip("/") | |
if ref: | |
url += "?ref=" + ref | |
output = await _run_curl(url) | |
return json.loads(output) | |
async def _fetch_tree_info( | |
path: Optional[str] = None, ref: Optional[str] = None, *, sha: Optional[str] = None | |
) -> JsonContainer: | |
""" | |
GET /repos/{owner}/{repo}/git/trees/{tree_sha} | |
GET /repos/{owner}/{repo}/git/trees/{ref}:{path} | |
https://docs.github.com/en/rest/reference/git#get-a-tree | |
:param path: | |
The repo path to get info for. | |
:param ref: | |
Specify the branch/tag/commit. | |
:param sha: | |
If given, path and ref should not be given. | |
:return: | |
Decoded JSON from the Github API. | |
""" | |
if ( | |
# fmt: off | |
sha and not (path is None and not ref) | |
or not sha and (path is None or not ref) | |
# fmt: on | |
): | |
raise ValueError("Expected either sha OR both path and ref") | |
if sha: | |
treeish = sha | |
else: | |
treeish = f"{ref}:{urllib.parse.quote(path, safe='')}" | |
url = posixpath.join(BASE_URL, "repos", REPO_IDENT, "git", "trees", treeish).rstrip( | |
"/" | |
) | |
output = await _run_curl(url) | |
return json.loads(output) | |
async def _fetch_blob_info( | |
path: Optional[str] = None, ref: Optional[str] = None, *, sha: Optional[str] = None | |
) -> JsonContainer: | |
""" | |
GET /repos/{owner}/{repo}/git/blobs/{blob_sha} | |
GET /repos/{owner}/{repo}/git/blobs/{ref}:{path} | |
https://docs.github.com/en/rest/reference/git#get-a-blob | |
:param path: | |
The repo path to get info for. | |
:param ref: | |
Specify the branch/tag/commit. | |
:param sha: | |
If given, path and ref should not be given. | |
:return: | |
Decoded JSON from the Github API. | |
""" | |
if ( | |
# fmt: off | |
sha and not (path is None and not ref) | |
or not sha and (path is None or not ref) | |
# fmt: on | |
): | |
raise ValueError("Expected either sha OR both path and ref") | |
if sha: | |
blobish = sha | |
else: | |
blobish = f"{ref}:{urllib.parse.quote(path, safe='')}" | |
url = posixpath.join(BASE_URL, "repos", REPO_IDENT, "git", "blobs", blobish).rstrip( | |
"/" | |
) | |
output = await _run_curl(url) | |
return json.loads(output) | |
# ------------------------------------------------------------------------------ | |
# Main logic | |
# ------------------------------------------------------------------------------ | |
def convert_repo_web_url(repo_url: str) -> Tuple[str, str]: | |
""" | |
Convert a web browser repo URL to the base of an API URL and repo name. | |
This includes handling for public (github.com) and enterprise URLs. | |
:param repo_url: | |
The repo URL to convert. | |
Examples: | |
- "https://github.com/{owner}/{repo}" | |
- "https://private-enterprise-domain.com/{org}/{repo}" | |
:return: | |
The API base URL and org/owner + repo name segment. | |
Examples: | |
- ("https://api.github.com", "{owner}/{repo}") | |
- ("https://private-enterprise-domain.com/api/v3", "{org}/{repo}") | |
""" | |
scheme, netloc, repo_ident, *_ = urllib.parse.urlsplit(repo_url) | |
if netloc == "github.com": | |
netloc = f"api.{netloc}" | |
url_path = "" | |
else: | |
url_path = "api/v3" | |
return ( | |
urllib.parse.urlunsplit((scheme, netloc, url_path, "", "")), | |
repo_ident.strip("/"), | |
) | |
async def fetch_path_contents(path: str, ref: str) -> List[File]: | |
""" | |
Fetch all files and their content under the given path (file or dir). | |
:param path: | |
The path to fetch from. | |
:param ref: | |
The branch/tag/commit to fetch from. | |
:return: | |
A list of file tuples containing path, mode and contents. | |
""" | |
# First check what the given path is (dir/file). | |
info = await _fetch_path_info(path, ref) | |
if isinstance(info, list): # dir | |
logger.info("Recursing into directory %r", path) | |
fetch_tree_tasks = {path: _fetch_tree_info(path, ref)} | |
files = [] | |
while fetch_tree_tasks: | |
base_paths = list(fetch_tree_tasks.keys()) | |
tree_results = await asyncio.gather(*fetch_tree_tasks.values()) | |
fetch_tree_tasks = dict() | |
for base_path, tree in zip(base_paths, tree_results): | |
if tree["truncated"]: | |
raise RuntimeError("Github API returned a truncated result") | |
for info in tree["tree"]: | |
full_path = os.path.join(base_path, info["path"]) | |
if info["type"] == "tree": | |
fetch_tree_tasks[full_path] = _fetch_tree_info(sha=info["sha"]) | |
else: | |
files.append( | |
File( | |
full_path, | |
info["sha"], | |
_parse_file_mode(info["mode"]), | |
None, | |
None, | |
) | |
) | |
blob_results = await asyncio.gather( | |
*(_fetch_blob_info(sha=f.sha) for f in files) | |
) | |
for i, (file, blob) in enumerate(zip(files, blob_results)): | |
files[i] = File(*file[:-1], _decode_file_contents(blob["content"])) | |
else: # file | |
logger.info("Fetching file %r", path) | |
# Get the file's mode by fetching the tree info. | |
parent_tree = (await _fetch_tree_info(posixpath.dirname(path), ref))["tree"] | |
tree_file_info = {x["path"]: x for x in parent_tree}[posixpath.basename(path)] | |
files = [ | |
File( | |
path, | |
info["sha"], | |
_parse_file_mode(tree_file_info["mode"]), | |
None, | |
_decode_file_contents(info["content"]), | |
) | |
] | |
return files | |
def write_files(files: List[File], *, dest: str = "./", fmt: str = "tgz") -> None: | |
def create_files(base_path: str): | |
for file in files: | |
logger.debug("Creating file: %s", file[:-1]) | |
fullpath = os.path.join(base_path, file.path) | |
os.makedirs(os.path.dirname(fullpath), exist_ok=True) | |
with open(fullpath, "w") as f: | |
f.write(file.contents) | |
os.chmod(fullpath, file.mode) | |
if fmt == "plain": | |
logger.info("Writing files under %s", dest) | |
create_files(dest) | |
return | |
if os.path.isdir(dest): | |
dest = os.path.join(dest, f"archive.{fmt}") | |
logger.info("Writing files and creating archive at %s", dest) | |
with tempfile.TemporaryDirectory() as tmpdir: | |
create_files(tmpdir) | |
if fmt in ("tar", "tgz"): | |
fmt_code = "gz" if fmt == "tgz" else "" | |
with tarfile.open(dest, f"w:{fmt_code}") as tf: | |
tf.add(tmpdir, arcname=".") | |
elif fmt == "zip": | |
with zipfile.ZipFile(dest, "w") as zf: | |
zf.write(tmpdir, arcname=".") | |
else: | |
raise ValueError(f"Unsupported archive format {fmt!r}") | |
def parse_args(argv): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("repo_url", help="URL to the repo (http or https)") | |
parser.add_argument("path", help="Path in the repo to archive") | |
parser.add_argument("--ref", help="Ref-point to archive from (e.g. branch/commit)") | |
parser.add_argument( | |
"--user", | |
help="GitHub username if auth is required - can also use GH_USER env var, " | |
"defaults to USER env var", | |
) | |
parser.add_argument( | |
"--token", | |
help="GitHub API token if auth is required - can also use GH_TOKEN env var", | |
) | |
parser.add_argument( | |
"--output", | |
"-o", | |
default="./", | |
help="Output path for the archive, defaults to cwd", | |
) | |
parser.add_argument( | |
"--format", | |
"-f", | |
choices=["tar", "tgz", "zip", "plain"], | |
help="The format to save the archive in, inferred from the output filename " | |
"if possible, otherwise defaults to tgz", | |
) | |
parser.add_argument( | |
"--verbose", "-v", action="store_true", help="Include debug logs" | |
) | |
parser.add_argument( | |
"--quiet", "-q", action="store_true", help="Hide info-level logs" | |
) | |
args = parser.parse_args(argv) | |
if args.format == "plain" and not os.path.isdir(args.output): | |
parser.error("When using 'plain' format the output path must be a directory") | |
return args | |
def main(argv): | |
global BASE_URL, REPO_IDENT, USER, TOKEN | |
loop = asyncio.get_event_loop() | |
args = parse_args(argv) | |
# Logging setup. | |
if args.verbose: | |
log_level = logging.DEBUG | |
elif args.quiet: | |
log_level = logging.WARNING | |
else: | |
log_level = logging.INFO | |
logging.basicConfig(format="%(levelname)5s: %(message)s") | |
logger.setLevel(log_level) | |
if args.format is None: | |
if not os.path.isdir(args.output) and args.output[-3:] in ["tar", "tgz", "zip"]: | |
args.format = args.output[-3:] | |
logger.debug("Determined desired format to be %r", args.format) | |
else: | |
args.format = "tgz" | |
# Set global variables. | |
BASE_URL, REPO_IDENT = convert_repo_web_url(args.repo_url) | |
logger.debug("Base API URL: %s, repo segment: %s", BASE_URL, REPO_IDENT) | |
if args.user: | |
USER = args.user | |
elif os.environ.get("GH_USER"): | |
USER = os.environ.get("GH_USER") | |
else: | |
USER = os.environ.get("USER", "") | |
if args.token: | |
TOKEN = args.token | |
else: | |
TOKEN = os.environ.get("GH_TOKEN") | |
if args.ref is None: | |
args.ref = loop.run_until_complete(_fetch_repo_info())["default_branch"] | |
logger.debug("Default branch: %s", args.ref) | |
# Do the work. | |
files = loop.run_until_complete(fetch_path_contents(args.path, args.ref)) | |
write_files(files, dest=args.output, fmt=args.format) | |
logger.info("Success!") | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Now sets file modes correctly (but not mtimes or dir modes). May also be doing a better job of maximising async operations being triggered in parallel, but also requires extra API calls in some cases so not necessarily faster.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This fails to set the correct file/dir mode.
/repos/{owner}/{repo}/contents/{path}
can be replaced with/repos/{owner}/{repo}/git/trees/{branch}:{path}
to get back the mode, which will then need to be set when the file is fetched.It would also be nice to set the mtime as stored in the repo...