Skip to content

Instantly share code, notes, and snippets.

@kevinkreiser
Last active February 13, 2017 16:14
Show Gist options
  • Save kevinkreiser/b838d22d165dc02e41a4b1905787bb91 to your computer and use it in GitHub Desktop.
Save kevinkreiser/b838d22d165dc02e41a4b1905787bb91 to your computer and use it in GitHub Desktop.
Copy Github Issues Between Repositories
#!/usr/bin/env python
import sys
import json
import requests
from functools import partial
import pprint
import calendar
import time
issue_parameters = set(['title', 'body', 'milestone', 'labels', 'assignees', 'state'])
pp = pprint.PrettyPrinter(indent=4)
#wait until rate limit is restored
def wait_limit(session, initial_wait):
print 'Waiting %d seconds before continuing' % initial_wait
time.sleep(initial_wait)
response = session.get('https://api.github.com/rate_limit').json()
diff = response['resources']['core']['reset'] - calendar.timegm(time.gmtime())
if diff > 0 and response['resources']['core']['remaining'] == 0:
print 'Waiting %d seconds before continuing' % diff
time.sleep(diff)
#remove none values
def remove_nones(obj):
if isinstance(obj, list):
return [ remove_nones(o) for o in obj ]
elif isinstance(obj, dict):
return { k:remove_nones(v) for k, v in obj.iteritems() if v is not None }
else:
return obj
#make a request and check it or wait
def fail_or_retry(request, success_code, wait):
initial_wait = 10
while True:
response = request()
result = remove_nones(response.json())
if response.status_code == success_code:
return result, response.headers
pp.pprint(result)
if response.status_code == 403:
wait(initial_wait)
initial_wait *= 2
else:
sys.exit(1)
#get all the issues in the repos
def get_github_issues(user, token, org, repos):
#NOTE: change this and how its used depending on your use case
#default is only copy issues you created, you could change this
#to get all non team member issues:
user_filter = set([user])
issues = []
with requests.Session() as session:
session.auth = (user, token)
wait = partial(wait_limit, session)
for repo in repos:
url = 'https://api.github.com/repos/%s/%s/issues?sort=created&state=all&per_page=100' % (org, repo)
while url:
batch, headers = fail_or_retry(partial(session.get, url), 200, wait)
for i in batch:
i['labels'].append({'name': repo})
issues += [ i for i in batch if i.get('pull_request') is None and i['user']['login'] in user_filter ]
url = { 'link': l.split('>')[0].lstrip('<') for l in headers.get('Link', '').split(',') if l.find('rel="next"') != -1 }.get('link')
return sorted(issues, key=lambda k: k['created_at'])
#fixes the milestones to both exist and have the right number
def align_milestones(user, token, org, repo, issues):
milestones = []
with requests.Session() as session:
session.auth = (user, token)
wait = partial(wait_limit, session)
url = 'https://api.github.com/repos/%s/%s/milestones?state=all&per_page=100' % (org, repo)
while url:
batch, headers = fail_or_retry(partial(session.get, url), 200, wait)
milestones += batch
url = { 'link': l.split('>')[0].lstrip('<') for l in headers.get('Link', '').split(',') if l.find('rel="next"') != -1 }.get('link')
milestones = { m['title']:m['number'] for m in milestones }
url = 'https://api.github.com/repos/%s/%s/milestones' % (org, repo)
for i in issues:
if 'milestone' in i:
if i['milestone']['title'] not in milestones:
posted, headers = fail_or_retry(partial(session.post, url, json.dumps(i['milestone'])), 201, wait)
milestones[i['milestone']['title']] = posted['number']
if i['milestone']['state'] == 'closed':
close_url = 'https://api.github.com/repos/%s/%s/milestones/%d' % (org, repo, posted['number'])
fail_or_retry(partial(session.patch, close_url, '{"state":"closed"}'), 200, wait)
i['milestone'] = milestones[i['milestone']['title']]
#get all the comments for the issue
def clean_issues(user, token, issues):
with requests.Session() as session:
session.auth = (user, token)
wait = partial(wait_limit, session)
for issue in issues:
comment_block = ''
if issue['comments'] > 0:
comments, headers = fail_or_retry(partial(session.get, issue['comments_url']), 200, wait)
comment_block = '\r\n\r\n'.join([ '(%s) @%s : %s' % (c['created_at'], c['user']['login'], c['body']) for c in comments ])
org, repo, i, number = issue['url'].split('/')[4:]
intro = '(%s) @%s : originally created %s/%s#%s' % (issue['created_at'], issue['user']['login'], org, repo, number)
if 'body' not in issue or issue['body'] is None:
issue['body'] = ''
issue['body'] = intro + '\r\n\r\n' + issue['body'] + '\r\n\r\n' + comment_block
for k,v in issue.items():
if k not in issue_parameters:
del issue[k]
issue['assignees'] = [ assignee['login'] for assignee in issue['assignees'] ]
issue['labels'] = [ label['name'] for label in issue['labels'] ]
#move some issues into the repo
def make_github_issues(user, token, org, repo, issues):
url = 'https://api.github.com/repos/%s/%s/issues' % (org, repo)
with requests.Session() as session:
session.auth = (user, token)
wait = partial(wait_limit, session)
for issue in issues:
posted, headers = fail_or_retry(partial(session.post, url, json.dumps(issue)), 201, wait)
if issue['state'] == 'closed':
close_url = 'https://api.github.com/repos/%s/%s/issues/%d' % (org, repo, posted['number'])
fail_or_retry(partial(session.patch, close_url, '{"state":"closed"}'), 200, wait)
time.sleep(0.2)
if __name__ == "__main__":
#show some help about args
if len(sys.argv) != 7:
print '%s user password_or_token src_user_or_org src_repos dst_user_or_org dst_repo' % sys.argv[0]
print 'if you use 2F auth generate a new personal access token and use that as the password'
sys.exit(1)
user, token, src_org, src_repos, dst_org, dst_repo = sys.argv[1:]
src_repos = src_repos.split(',')
#get the issues from the srcs
print 'fetching issues for %s' % ' '.join(src_repos)
issues = get_github_issues(user, token, src_org, src_repos)
#create milestones if we dont have them
print 'aligning milestones for %s issues' % len(issues)
align_milestones(user, token, dst_org, dst_repo, issues)
#fix up the issues since they are only copies
print 'sanitizing %d issues' % len(issues)
clean_issues(user, token, issues)
#copy them to the dst
print 'creating %d issues' % len(issues)
make_github_issues(user, token, dst_org, dst_repo, issues)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment