Last active
December 12, 2015 09:49
-
-
Save DanielOaks/4754185 to your computer and use it in GitHub Desktop.
Bitbucket issue conversion, fix for new everything
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!usr/bin/env python2 | |
# -*- coding: utf-8 -*- | |
## And this is the modified Bitbucket library I use. I have to bundle this, because the actual one is broken | |
# -*- coding: utf-8 -*- | |
# git+git://github.com/Sheeprider/Py-BitBucket.git | |
__all__ = ['Bitbucket', ] | |
from tempfile import NamedTemporaryFile | |
from zipfile import ZipFile | |
import json | |
import requests | |
class Bitbucket(object): | |
""" This class lets you interact with the bitbucket public API. | |
It depends on Requests. | |
Curently, only repositories, services (hooks) and ssh keys | |
related functionalities are available. | |
""" | |
def __init__(self, username='', password='', repo_name_or_slug=''): | |
self.username = username | |
self.password = password | |
self.repo_slug = repo_name_or_slug.lower().replace(r'[^a-z0-9_-]+', '-') | |
self.repo_tree = {} | |
# ============= | |
# = Utilities = | |
# ============= | |
def dispatch(self, method, url, auth=None, params=None, **kwargs): | |
""" Send HTTP request, with given method, | |
credentials and data to the given URL, | |
and return the status code and the result on success. | |
""" | |
r = requests.request( | |
method=method, | |
url=url, | |
auth=auth, | |
params=params, | |
data=kwargs) | |
status = r.status_code | |
text = r.text | |
#error = r.error | |
if r: | |
if status >= 200 and status < 300: | |
if text: | |
try: | |
return (True, json.loads(text)) | |
except TypeError: | |
pass | |
except ValueError: | |
pass | |
return (True, text) | |
elif status >= 300 and status < 400: | |
return (False, 'Unauthorized access, ' | |
'please check your credentials.') | |
elif status >= 400 and status < 500: | |
return (False, 'Service not found.') | |
elif status >= 500 and status < 600: | |
return (False, 'Server error.') | |
else: | |
return (False, 'Unknown error.') | |
def url(self, action, **kwargs): | |
""" Construct and return the URL for a specific API service. """ | |
# TODO : should be static method ? | |
return self.URLS['BASE'] % self.URLS[action] % kwargs | |
def get_files_in_dir(self, repo_slug=None, dir='/'): | |
repo_slug = repo_slug or self.repo_slug or '' | |
dir = dir.lstrip('/') | |
url = self.url( | |
'GET_ARCHIVE', | |
username=self.username, | |
repo_slug=repo_slug, | |
format='src') | |
dir_url = url + dir | |
response = self.dispatch('GET', dir_url, auth=self.auth) | |
if response[0] and isinstance(response[1], dict): | |
repo_tree = response[1] | |
url = self.url( | |
'GET_ARCHIVE', | |
username=self.username, | |
repo_slug=repo_slug, | |
format='raw') | |
# Download all files in dir | |
for file in repo_tree['files']: | |
file_url = url + '/'.join((file['path'],)) | |
response = self.dispatch('GET', file_url, auth=self.auth) | |
self.repo_tree[file['path']] = response[1] | |
# recursively download in dirs | |
for directory in repo_tree['directories']: | |
dir_path = '/'.join((dir, directory)) | |
self.get_files_in_dir(repo_slug=repo_slug, dir=dir_path) | |
@property | |
def auth(self): | |
""" Return credentials for current Bitbucket user. """ | |
return (self.username, self.password) | |
def username(): #@NoSelf | |
"""Your username.""" | |
def fget(self): | |
return self._username | |
def fset(self, value): | |
self._username = '%s' % value | |
def fdel(self): | |
del self._username | |
return locals() | |
username = property(**username()) | |
def password(): #@NoSelf | |
"""Your password.""" | |
def fget(self): | |
return self._password | |
def fset(self, value): | |
self._password = '%s' % value | |
def fdel(self): | |
del self._password | |
return locals() | |
password = property(**password()) | |
def repo_slug(): #@NoSelf | |
"""Your repository slug name.""" | |
def fget(self): | |
return self._repo_slug | |
def fset(self, value): | |
value = '%s' % value | |
self._repo_slug = value.lower().replace(r'[^a-z0-9_-]+', '-') | |
def fdel(self): | |
del self._repo_slug | |
return locals() | |
repo_slug = property(**repo_slug()) | |
# ======= | |
# = API = | |
# ======= | |
def get_user(self, username=None): | |
""" Returns user informations. | |
If username is not defined, tries to return own informations. | |
""" | |
username = username or self.username or '' | |
url = self.url('GET_USER', username=username) | |
response = self.dispatch('GET', url) | |
try: | |
return (response[0], response[1]['user']) | |
except TypeError: | |
pass | |
return response | |
def public_repos(self, username=None): | |
""" Returns all public repositories from an user. | |
If username is not defined, tries to return own public repos. | |
""" | |
username = username or self.username or '' | |
url = self.url('GET_USER', username=username) | |
response = self.dispatch('GET', url) | |
try: | |
return (response[0], response[1]['repositories']) | |
except TypeError: | |
pass | |
return response | |
def get_repositories(self): | |
""" Return own repositories.""" | |
url = self.url('GET_USER', username=self.username) | |
response = self.dispatch('GET', url, auth=self.auth) | |
try: | |
return (response[0], response[1]['repositories']) | |
except TypeError: | |
pass | |
return response | |
def get_repository(self, repo_slug=None): | |
""" Get a single repository on Bitbucket and return it.""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_REPO', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('GET', url, auth=self.auth) | |
def get_tags(self, repo_slug=None): | |
""" Get a single repository on Bitbucket and return its tags.""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_TAGS', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('GET', url, auth=self.auth) | |
def get_branches(self, repo_slug=None): | |
""" Get a single repository on Bitbucket and return its branches.""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_BRANCHES', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('GET', url, auth=self.auth) | |
def create_repository(self, repo_name, scm='git', private=True): | |
""" Creates a new repository on own Bitbucket account and return it.""" | |
url = self.url('CREATE_REPO') | |
return self.dispatch('POST', url, auth=self.auth, name=repo_name, scm=scm, is_private=private) | |
def update_repository(self, repo_slug=None, **kwargs): | |
""" Updates repository on own Bitbucket account and return it.""" | |
url = self.url('UPDATE_REPO', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('PUT', url, auth=self.auth, **kwargs) | |
def delete_repository(self, repo_slug=None): | |
""" Delete a repository on own Bitbucket account. | |
Please use with caution as there is NO confimation and NO undo. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('DELETE_REPO', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('DELETE', url, auth=self.auth) | |
def add_service(self, service, repo_slug=None, **kwargs): | |
""" Add a service (hook) to one of your repositories. | |
Each type of service require a different set of additionnal fields, | |
you can pass them as keyword arguments (fieldname='fieldvalue'). | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('SET_SERVICE', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('POST', url, auth=self.auth, type=service, **kwargs) | |
def get_service(self, service_id, repo_slug=None): | |
""" Get a service (hook) from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_SERVICE', username=self.username, repo_slug=repo_slug, service_id=service_id) | |
return self.dispatch('GET', url, auth=self.auth) | |
def update_service(self, service_id, repo_slug=None, **kwargs): | |
""" Update a service (hook) from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('UPDATE_SERVICE', username=self.username, repo_slug=repo_slug, service_id=service_id) | |
return self.dispatch('PUT', url, auth=self.auth, **kwargs) | |
def delete_service(self, service_id, repo_slug=None): | |
""" Delete a service (hook) from one of your repositories. | |
Please use with caution as there is NO confimation and NO undo. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('DELETE_SERVICE', username=self.username, repo_slug=repo_slug, service_id=service_id) | |
return self.dispatch('DELETE', url, auth=self.auth) | |
def get_services(self, repo_slug=None): | |
""" Get all services (hook) from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_SERVICES', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('GET', url, auth=self.auth) | |
def get_archive(self, repo_slug=None, format='zip', prefix=''): | |
""" Get one of your repositories and compress it as an archive. | |
Return the path of the archive. | |
format parameter is curently not supported. | |
""" | |
prefix = '%s'.lstrip('/') % prefix | |
self.get_files_in_dir(repo_slug=repo_slug, dir='/') | |
if self.repo_tree: | |
with NamedTemporaryFile(delete=False) as archive: | |
with ZipFile(archive, 'w') as zip_archive: | |
for name, file in self.repo_tree.items(): | |
with NamedTemporaryFile(delete=False) as temp_file: | |
temp_file.write(file) | |
zip_archive.write(temp_file.name, prefix + name) | |
return (True, archive.name) | |
return (False, 'Could not archive your project.') | |
def get_ssh_keys(self): | |
""" Get all ssh keys associated with your account. | |
""" | |
url = self.url('GET_SSH_KEYS') | |
return self.dispatch('GET', url, auth=self.auth) | |
def get_ssh_key(self, key_id=1): | |
""" Get one of the ssh keys associated with your account. | |
""" | |
url = self.url('GET_SSH_KEY', key_id=key_id) | |
return self.dispatch('GET', url, auth=self.auth) | |
def set_ssh_key(self, key=None, label=None): | |
""" Associate an ssh key with your account and return it. | |
""" | |
key = '%s' % key | |
url = self.url('SET_SSH_KEY') | |
return self.dispatch('POST', url, auth=self.auth, key=key, label=label) | |
def delete_ssh_key(self, key_id=None): | |
""" Delete one of the ssh keys associated with your account. | |
Please use with caution as there is NO confimation and NO undo. | |
""" | |
url = self.url('DELETE_SSH_KEY', key_id=key_id) | |
return self.dispatch('DELETE', url, auth=self.auth) | |
def get_issues(self, repo_slug=None, params=None): | |
""" Get issues from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_ISSUES', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('GET', url, auth=self.auth, params=params) | |
def get_issue(self, issue_id, repo_slug=None): | |
""" Get an issue from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_ISSUE', username=self.username, repo_slug=repo_slug, issue_id=issue_id) | |
return self.dispatch('GET', url, auth=self.auth) | |
def add_issue(self, repo_slug=None, **kwargs): | |
""" Add an issue to one of your repositories. | |
Each issue require a different set of attributes, | |
you can pass them as keyword arguments (attributename='attributevalue'). | |
Attributes are: | |
title: The title of the new issue. | |
content: The content of the new issue. | |
component: The component associated with the issue. | |
milestone: The milestone associated with the issue. | |
version: The version associated with the issue. | |
responsible: The username of the person responsible for the issue. | |
status: The status of the issue (new, open, resolved, on hold, invalid, duplicate, or wontfix). | |
kind: The kind of issue (bug, enhancement, or proposal). | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('CREATE_ISSUE', username=self.username, repo_slug=repo_slug) | |
return self.dispatch('POST', url, auth=self.auth, **kwargs) | |
def update_issue(self, issue_id, repo_slug=None, **kwargs): | |
""" Update an issue to one of your repositories. | |
Each issue require a different set of attributes, | |
you can pass them as keyword arguments (attributename='attributevalue'). | |
Attributes are: | |
title: The title of the new issue. | |
content: The content of the new issue. | |
component: The component associated with the issue. | |
milestone: The milestone associated with the issue. | |
version: The version associated with the issue. | |
responsible: The username of the person responsible for the issue. | |
status: The status of the issue (new, open, resolved, on hold, invalid, duplicate, or wontfix). | |
kind: The kind of issue (bug, enhancement, or proposal). | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('UPDATE_ISSUE', username=self.username, repo_slug=repo_slug, issue_id=issue_id) | |
return self.dispatch('PUT', url, auth=self.auth, **kwargs) | |
def delete_issue(self, issue_id, repo_slug=None): | |
""" Delete an issue from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('DELETE_ISSUE', username=self.username, repo_slug=repo_slug, issue_id=issue_id) | |
return self.dispatch('DELETE', url, auth=self.auth) | |
def get_issue_comments(self, issue_id, repo_slug=None): | |
""" Get issue comments from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_COMMENTS', username=self.username, | |
repo_slug=repo_slug, issue_id=issue_id) | |
return self.dispatch('GET', url, auth=self.auth) | |
def get_issue_comment(self, issue_id, comment_id, repo_slug=None): | |
""" Get an issue from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('GET_COMMENT', username=self.username, | |
repo_slug=repo_slug, issue_id=issue_id, | |
comment_id=comment_id) | |
return self.dispatch('GET', url, auth=self.auth) | |
def add_issue_comment(self, issue_id, repo_slug=None, **kwargs): | |
""" Add an issue comment to one of your repositories. | |
Each issue comment require only the content data field | |
the system autopopulate the rest. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('CREATE_COMMENT', username=self.username, | |
repo_slug=repo_slug, issue_id=issue_id) | |
return self.dispatch('POST', url, auth=self.auth, **kwargs) | |
def update_issue_comment(self, issue_id, comment_id, repo_slug=None, **kwargs): | |
""" Update an issue comment in one of your repositories. | |
Each issue comment require only the content data field | |
the system autopopulate the rest. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('UPDATE_COMMENT', username=self.username, | |
repo_slug=repo_slug, issue_id=issue_id, | |
comment_id=comment_id) | |
return self.dispatch('PUT', url, auth=self.auth, **kwargs) | |
def delete_issue_comment(self, issue_id, comment_id, repo_slug=None): | |
""" Delete an issue from one of your repositories. | |
""" | |
repo_slug = repo_slug or self.repo_slug or '' | |
url = self.url('DELETE_COMMENT', username=self.username, | |
repo_slug=repo_slug, issue_id=issue_id, | |
comment_id=comment_id) | |
return self.dispatch('DELETE', url, auth=self.auth) | |
# ======== | |
# = URLs = | |
# ======== | |
URLS = { | |
'BASE': 'https://api.bitbucket.org/1.0/%s', | |
# Get user profile and repos | |
'GET_USER': 'users/%(username)s/', | |
# Get archive | |
'GET_ARCHIVE': 'repositories/%(username)s/%(repo_slug)s/%(format)s/master/', | |
# Search repo | |
# 'SEARCH_REPO': 'repositories/?name=%(search)s', | |
# Set repo | |
'CREATE_REPO': 'repositories/', | |
'GET_REPO': 'repositories/%(username)s/%(repo_slug)s/', | |
'GET_TAGS': 'repositories/%(username)s/%(repo_slug)s/tags/', | |
'GET_BRANCHES': 'repositories/%(username)s/%(repo_slug)s/branches/', | |
'UPDATE_REPO': 'repositories/%(username)s/%(repo_slug)s/', | |
'DELETE_REPO': 'repositories/%(username)s/%(repo_slug)s/', | |
# Get services (hooks) | |
'GET_SERVICE': 'repositories/%(username)s/%(repo_slug)s/services/%(service_id)s/', | |
'GET_SERVICES': 'repositories/%(username)s/%(repo_slug)s/services/', | |
# Set services (hooks) | |
'SET_SERVICE': 'repositories/%(username)s/%(repo_slug)s/services/', | |
'UPDATE_SERVICE': 'repositories/%(username)s/%(repo_slug)s/services/%(service_id)s/', | |
'DELETE_SERVICE': 'repositories/%(username)s/%(repo_slug)s/services/%(service_id)s/', | |
# SSH keys | |
'GET_SSH_KEYS': 'ssh-keys/', | |
'GET_SSH_KEY': 'ssh-keys/%(key_id)s', | |
'SET_SSH_KEY': 'ssh-keys/', | |
'DELETE_SSH_KEY': 'ssh-keys/%(key_id)s', | |
# Issues | |
'GET_ISSUES': 'repositories/%(username)s/%(repo_slug)s/issues/', | |
'GET_ISSUE': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/', | |
'CREATE_ISSUE': 'repositories/%(username)s/%(repo_slug)s/issues/', | |
'UPDATE_ISSUE': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/', | |
'DELETE_ISSUE': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/', | |
# Issue comments | |
'GET_COMMENTS': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/comments/', | |
'GET_COMMENT': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/comments/%(comment_id)s/', | |
'CREATE_COMMENT': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/comments/', | |
'UPDATE_COMMENT': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/comments/%(comment_id)s/', | |
'DELETE_COMMENT': 'repositories/%(username)s/%(repo_slug)s/issues/%(issue_id)s/comments/%(comment_id)s/', | |
} | |
## And this is our code | |
#from bitbucket import bitbucket | |
from getpass import getpass | |
print 'Old Repo:' | |
old_repo = Bitbucket(raw_input(' Username: '), getpass(' Password: '), raw_input(' Repo name: ')) | |
print '\nNew Repo:' | |
new_repo = Bitbucket(raw_input(' Username: '), getpass(' Password: '), raw_input(' Repo name: ')) | |
# request original ISSUES | |
_, result = old_repo.get_issues() | |
for issue in result['issues'][:]: | |
original_id = issue['local_id'] | |
if issue['status'] != 'new': | |
continue | |
# and post to the new repo | |
ok, new_issue = new_repo.add_issue(**issue) | |
if not ok: | |
print 'Fail migrating #%d' % original_id | |
continue | |
new_id = new_issue['local_id'] | |
print 'Migrated #%d as #%d in the new project' % (original_id, new_id) | |
# add a comment to mark the migration | |
who = issue.get('reported_by', None) | |
who = who['username'] if who else 'anonymous' | |
new_repo.add_issue_comment(new_id, content="Issue migrated from the original repo. " | |
"Was #%d reported by %s" % (original_id, who)) | |
# migrate comments | |
result, comments = old_repo.get_issue_comments(original_id) | |
for c in comments: | |
if not c['content']: | |
continue | |
new_repo.add_issue_comment(new_id, **c) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment