Last active
April 21, 2020 03:26
-
-
Save aldur/4a0345e58af7107f270a to your computer and use it in GitHub Desktop.
Archive a GitHub repository to a MD file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# coding = utf-8 | |
# author = Adriano Di Luzio - Unbit | |
""" | |
Given user and repository this script downloads and archives | |
(by using Markdown) the entire issues history of the repository. | |
It is designed to work with both python3 and python2. | |
It requires python-requests. | |
""" | |
from __future__ import print_function | |
import json | |
import re | |
import sys | |
try: | |
import requests | |
except ImportError: | |
print("Please install python-requests (pip) and restart me.") | |
sys.exit(1) | |
ISSUES_API = 'https://api.github.com/repos/' | |
# Request attributes | |
USER = "" | |
PASSWORD = "" | |
TIMEOUT = 120 | |
class Issue(object): | |
"""The Issue class.""" | |
def __init__(self, issue, owner, repository): | |
self.number = int(issue['number']) | |
print("Processing issue #%d" % self.number) | |
r = requests.get( | |
ISSUES_API + | |
'%s/%s/issues/%d' % (owner, repository, self.number), | |
auth=(USER, PASSWORD) | |
) | |
issue_details = json.loads(r.text) | |
self.title = issue['title'] | |
self.body = issue['body'] | |
self.state = issue['state'] | |
self.labels = [l['name'] for l in issue_details['labels']] | |
self.created_at = issue['created_at'] | |
self.updated_at = issue['updated_at'] | |
self.created_by = (issue["user"]["login"], int(issue["user"]["id"])) | |
self.closed_at = issue['closed_at'] | |
if self.closed_at: | |
self.closed_by = ( | |
issue_details["closed_by"]["login"], int( | |
issue_details["closed_by"]["id"]) | |
) | |
try: | |
self.pull_request_url = issue_details["pull_request"]["diff_url"] | |
r = requests.get( | |
self.pull_request_url, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
self.pull_request = r.text | |
except KeyError: | |
pass # all PRs are issues, not all issues are PRs | |
self.n_comments = issue['comments'] | |
self.comments_url = issue['comments_url'] | |
# let's save a request | |
self.comments = get_issue_comments( | |
self.comments_url) if self.n_comments else list() | |
self.events_url = issue['events_url'] | |
self.events = get_issue_events(self.events_url) | |
def to_markdown(self): | |
"""Markdown representation of an issue.""" | |
if hasattr(self, "pull_request_url"): | |
mds = "## Pull request #%d\n" % self.number | |
else: | |
mds = "## Issue #%d\n" % self.number | |
mds += "### Title: %s\n" % self.title | |
mds += "#### Author: *%s* (id %d) @%s\n" % ( | |
self.created_by[0], self.created_by[1], self.updated_at) | |
if self.labels: | |
mds += "##### Labels: (%s) " % ''.join(self.labels) | |
if self.body: | |
mds += self.body + "\n\n" | |
if self.comments or self.events: | |
# events_and_comments = sorted(self.comments + self.events, key=lambda x: x[1]) | |
comment_strings = [ | |
("###### %s @ %s:\n%s" % comment, comment[1]) for comment in self.comments] | |
event_strings = [("###### (Event) *%s*: %s @ %s" % (event[2], event[0], event[1]), | |
event[1]) for event in self.events] | |
comments_and_events = sorted( | |
comment_strings + event_strings, key=lambda x: x[1]) | |
mds += "#### Comments and events:\n" | |
mds += '\n'.join( | |
list(zip(*comments_and_events))[0] | |
) | |
mds += "\n\n" | |
if hasattr(self, "pull_request_url"): | |
mds += "#### Pull request diff file\n" | |
mds += "```\n%s```\n" % self.pull_request | |
mds += "---\n" | |
return mds | |
def header_link_to_useful(s): | |
"""Parse the "link" field in the header response and extract number of pages and urls.""" | |
# TODO: make me fabulous! | |
links = s.split(",") | |
next = [l for l in links if "next" in l][ | |
0].split(";")[0].strip("<>") # ugly! | |
last = [l for l in links if "last" in l][ | |
0].split(";")[0].strip(" <>") # ugly! | |
next_page = re.search("\d", re.search('page=\d+$', next).group()).group() | |
last_page = re.search("\d", re.search('page=\d+$', last).group()).group() | |
url = next[:-len(next_page)] | |
return int(next_page), int(last_page), url | |
def get_issue_comments(url): | |
r = requests.get( | |
url, | |
params={'per_page': '100'}, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
comments_data = json.loads(r.text) | |
comments = [(comment["user"]["login"], comment["updated_at"], | |
comment["body"]) for comment in comments_data] | |
try: | |
link = r.headers['Link'] | |
next, last, url = header_link_to_useful(link) | |
for p in range(next, last + 1): | |
requests.get( | |
url + "%d" % p, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
comments = [(comment["user"]["login"], comment["updated_at"], comment[ | |
"body"]) for comment in comments_data] | |
except KeyError: | |
pass # Comments are contained in a single page, we're done here! | |
return comments | |
def get_issue_events(url): | |
r = requests.get( | |
url, | |
params={'per_page': '100'}, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
events = [(event['actor']['login'], event['created_at'], event['event']) | |
for event in json.loads(r.text)] | |
try: | |
link = r.headers['Link'] | |
next, last, url = header_link_to_useful(link) | |
for p in range(next, last + 1): | |
r = requests.get( | |
url + "%d" % p, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
events = [(event['actor']['login'], event['created_at'], event['event']) | |
for event in json.loads(r.text)] | |
except KeyError: | |
pass # Comments are contained in a single page, we're done here! | |
return events | |
def get_issues(owner, repository): | |
"""Return the list of Issues of the given repository.""" | |
r = requests.get( | |
ISSUES_API + '%s/%s/issues' % (owner, repository), | |
params={'state': 'all', 'per_page': '100'}, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
if int(r.headers['X-RateLimit-Remaining']) <= 150 or int(r.headers['X-RateLimit-Limit']) <= 100: | |
print("Warning: your current API pull of requests is limited.\n\ | |
This script may not work as expected.", | |
file=sys.stderr | |
) | |
issues = [Issue(i, owner, repository) for i in json.loads(r.text)] | |
try: | |
link = r.headers['Link'] | |
next, last, url = header_link_to_useful(link) | |
for p in range(next, last + 1): | |
r = requests.get( | |
url + "%d" % p, | |
auth=(USER, PASSWORD), | |
timeout=TIMEOUT | |
) | |
issues += [Issue(i, owner, repository) for i in json.loads(r.text)] | |
except KeyError: | |
pass # Issues are contained in a single page, we're done here! | |
return issues | |
def issues_to_markdown(owner, repository, issues): | |
"""Return a string containing the markdown representation of issues.""" | |
mds = "#Archive of issues related to GitHub repository *%s/%s*.\n" % ( | |
owner, repository) | |
for issue in issues: | |
mds += issue.to_markdown() | |
return mds | |
if __name__ == '__main__': | |
if not (USER and PASSWORD): | |
print("Please edit %s and fill USER and PASSWORD fields." % sys.argv[0]) | |
sys.exit(1) | |
if len(sys.argv) == 4: | |
user, repository = sys.argv[1], sys.argv[2] | |
issues = get_issues(user, repository) | |
try: | |
with open(sys.argv[3], "w", encoding="utf-8") as f: | |
print(issues_to_markdown(user, repository, issues), file=f) | |
except TypeError: | |
from codecs import open | |
with open(sys.argv[3], "w", encoding="utf-8") as f: | |
print(issues_to_markdown(user, repository, issues), file=f) | |
# else: | |
# pass | |
# finally: | |
# pass | |
# with open(sys.argv[3], "w", encoding="utf-8") as f: | |
# print(issues_to_markdown(user, repository, issues), file=f) | |
sys.exit(0) | |
else: | |
print("Usage:\n\tpython %s <Github User> <Github Repo> <Output file>" % sys.argv[0]) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment