Created
August 10, 2019 09:50
-
-
Save Asherslab/1f37627854e913b86f088aa8ca30e8d0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/local/bin/python3 | |
import argparse | |
import json | |
import sys | |
import threading | |
import requests | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers(help="help for subcommand") | |
subparsers.required = True | |
subparsers.dest = 'subparser_name' | |
# // Input \\ # | |
input_parser = subparsers.add_parser('input', help="Stats from helpful JSON") | |
input_parser.add_argument("--verbose", help="Say which value is which", action="store_true") | |
input_parser.add_argument("-s", "--state", help="Which state to use", type=str, choices=["all", "open", "closed"]) | |
input_parser.add_argument("-v", "--values", help="What values to output, in order. e.g. \"[name, pull_requests.open, issues.labels.bug.open]\"", type=str, required=True, nargs='*') | |
group = input_parser.add_mutually_exclusive_group() | |
group.required = True | |
group.add_argument("-si", "--stdininput", help="STDIN mode", action="store_true") | |
group.add_argument("-i", "--input", type=str, help="JSON Input") | |
# // Output \\ # | |
output_parser = subparsers.add_parser('output', help="Output helpful JSON to STDOUT") | |
output_parser.add_argument("-r", "--repo", help="Repository path org/name", type=str, required=True) | |
output_parser.add_argument("-k", "--key", help="GitHub API key", type=str) | |
output_parser.add_argument("-f", "--file", help="Output to file instead", type=str) | |
output_parser.add_argument("-l", "--labels", help="Parse labels", action="store_true") | |
THREADS = [] | |
def add_issue_stats(issue, issues): | |
issues[issue["state"]] += 1 | |
issues["total"] += 1 | |
def add_label_stats(issue, issue_labels): | |
for label in issue["labels"]: | |
name = label["name"].replace(" ", "_").lower() | |
issue_labels[name][issue["state"]] += 1 | |
issue_labels[name]["total"] += 1 | |
def get_issue_stats(arguments, issue_page_number, issues, pull_requests, issue_labels, pull_request_labels): | |
issue_page = requests.get(url="http://api.github.com/repos/%s/issues?state=all&per_page=100&page=%s" % (arguments.repo, issue_page_number + 1), headers={"Authorization": "token %s" % arguments.key}).json() | |
for issue in issue_page: | |
if "pull_request" in issue: | |
add_issue_stats(issue, pull_requests) | |
if arguments.labels: | |
add_label_stats(issue, pull_request_labels) | |
else: | |
add_issue_stats(issue, issues) | |
if arguments.labels: | |
add_label_stats(issue, issue_labels) | |
THREADS.remove(threading.current_thread()) | |
def output(arguments): | |
pull_requests = {"open": 0, "closed": 0, "total": 0} | |
issues = {"open": 0, "closed": 0, "total": 0} | |
pull_request_labels = {} | |
issue_labels = {} | |
headers = {"Authorization": "token %s" % arguments.key} | |
if arguments.labels: | |
labels = requests.get(url="https://api.github.com/repos/%s/labels" % arguments.repo, headers=headers).json() | |
for label in labels: | |
name = label["name"].replace(" ", "_").lower() | |
pull_request_labels[name] = {"open": 0, "closed": 0, "total": 0} | |
issue_labels[name] = {"open": 0, "closed": 0, "total": 0} | |
try: | |
request = requests.get(url="http://api.github.com/repos/%s/issues?state=all&per_page=1" % arguments.repo, headers=headers) | |
except requests.exceptions.RequestException as e: | |
output(arguments) | |
return | |
issues_header = request.headers | |
total_issues = 0 | |
for link in issues_header["link"].split(","): | |
(url, rel) = link.split("; ") | |
(url, page) = url[1:-1].split("&page=") | |
if "last" in rel: | |
total_issues = page | |
total_pages = (int(total_issues) // 100) + 1 | |
print("Total Pages: %s" % total_pages) | |
print("Total Issues: %s" % total_issues) | |
for i in range(total_pages): | |
thread = threading.Thread(target=get_issue_stats, args=(arguments, i, issues, pull_requests, issue_labels, pull_request_labels)) | |
THREADS.append(thread) | |
thread.start() | |
while len(THREADS) > 10: | |
THREADS[0].join() | |
for thread in THREADS: | |
thread.join() | |
repo = requests.get(url="https://api.github.com/repos/%s" % arguments.repo, | |
headers=headers).json() | |
if arguments.labels: | |
pull_requests["labels"] = pull_request_labels | |
issues["labels"] = issue_labels | |
output_dictionary = {"pull_requests": pull_requests, "issues": issues, "forks": repo["forks_count"], | |
"stars": repo["stargazers_count"], "subscribers": repo["subscribers_count"], | |
"watchers": repo["watchers_count"], "name": repo["name"], "full_name": repo["full_name"], | |
"size": repo["size"]} | |
if arguments.file: | |
with open(arguments.file, 'w') as f: | |
f.write(json.dumps(output_dictionary)) | |
else: | |
sys.stdout.write(json.dumps(output_dictionary) + "\n") | |
def input_parse(arguments, input_json): | |
for value in arguments.values: | |
current_value = input_json | |
invalid = False | |
for subvalue in value.split("."): | |
if subvalue in current_value: | |
current_value = current_value[subvalue] | |
else: | |
invalid = True | |
break | |
if invalid: | |
continue | |
if arguments.verbose: | |
print(str(value) + ": " + str(current_value)) | |
else: | |
print(current_value) | |
args = parser.parse_args() | |
if args.subparser_name == 'input': | |
if args.input: | |
try: | |
json_input = json.loads(args.input) | |
input_parse(args, json_input) | |
except ValueError as e: | |
print("Invalid Input") | |
elif args.stdininput: | |
try: | |
json_input = json.loads(sys.stdin.read()) | |
input_parse(args, json_input) | |
except ValueError as e: | |
print("Invalid Input") | |
elif args.subparser_name == 'output': | |
output(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment