|
""" |
|
Parse all users from release notes according to PR reference. |
|
|
|
Resources: |
|
- https://developer.github.com/v3/pulls/ |
|
- http://zetcode.com/python/requests/ |
|
- https://developer.github.com/v3/#authentication |
|
""" |
|
|
|
import json |
|
import logging |
|
import multiprocessing as mproc |
|
import os |
|
import re |
|
from functools import partial |
|
|
|
import fire |
|
import requests |
|
import tqdm |
|
|
|
URL_GITHUB_API = "https://api.github.com/repos" |
|
|
|
|
|
def request_pull(idx: int, gh_owner: str, gh_repo: str, auth_header: dict) -> tuple: |
|
req = requests.get(f"{URL_GITHUB_API}/{gh_owner}/{gh_repo}/pulls/{idx}", headers=auth_header) |
|
if req.status_code == 403: |
|
return idx, None |
|
# pull = json.loads(req.content) |
|
return idx, req |
|
|
|
|
|
def fetch_selected_prs(pr_ids: list, gh_owner: str, gh_repo: str, auth_header: dict) -> list: |
|
"""Fetch all requested PRs.""" |
|
pool = mproc.Pool(25) |
|
_req = partial(request_pull, gh_owner=gh_owner, gh_repo=gh_repo, auth_header=auth_header) |
|
|
|
pulls = [] |
|
with tqdm.tqdm(total=len(pr_ids), desc="Fetching pulls") as pbar: |
|
for idx, pull in pool.imap(_req, pr_ids): |
|
if pull is None: |
|
exit("Request failed, probably your limit is gone...") |
|
pulls.append((idx, pull)) |
|
pbar.update() |
|
pool.close() |
|
pool.join() |
|
|
|
correct_prs = [(idx, pr) for idx, pr in pulls if pr.status_code == 200] |
|
if len(correct_prs) < len(pulls): |
|
logging.warning("only %i from %i was retrieved correctly", len(correct_prs), len(pulls)) |
|
prs_content = [json.loads(pr.content) for _, pr in correct_prs] |
|
return prs_content |
|
|
|
|
|
def main( |
|
path_changelog: str, gh_owner: str = "PyTorchLightning", gh_repo: str = "pytorch-lightning", gh_token: str = None |
|
) -> None: |
|
"""Parse PR authors from a changelog. |
|
|
|
Args: |
|
path_changelog: define path to text file with changes |
|
gh_owner: GitHub repository owner (organisation/user) |
|
gh_repo: GitHub repository name under selected organisation/user |
|
gh_token: Personal GH token needed for higher API request limit |
|
""" |
|
# load the changelog |
|
assert os.path.isfile(path_changelog), "missing file: %s" % path_changelog |
|
with open(path_changelog) as fp: |
|
changelog = fp.read() |
|
|
|
# parse PR ids |
|
pr_ids = re.findall(r"#(\d+)", changelog) |
|
pr_ids = sorted(map(int, pr_ids)) |
|
# get PRs |
|
auth_header = {"Authorization": f"token {gh_token}"} if gh_token else {} |
|
prs_content = fetch_selected_prs(pr_ids, gh_owner, gh_repo, auth_header) |
|
|
|
# process authors |
|
authors = [pr["user"]["login"] for pr in prs_content] |
|
uq_authors = sorted(set(authors), key=lambda n: n.lower()) |
|
logging.info("Users: \n\t%s", "\n\t".join(uq_authors)) |
|
|
|
|
|
if __name__ == "__main__": |
|
logging.basicConfig(level=logging.INFO) |
|
logging.info("running...") |
|
fire.Fire(main) |
|
logging.info("Done :]") |