Last active
July 18, 2017 23:05
-
-
Save nkmathew/a989aa313a45fc69cab2d50ab89ea607 to your computer and use it in GitHub Desktop.
Script for easily cloning all the repos in a Bitbucket account
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
r""" | |
Date: Jul 17, 2017 | |
Author: nkmathew | |
A script for cloning all repositories by a certain user from Bitbucket. Can also | |
clone your private repos if you want(requires authentication). It's supposed to give | |
you a bit more control than you would have with an equivalent curl command, e.g | |
curl --user nkmathew https://api.bitbucket.org/2.0/repositories/nkmathew | | |
grep -o '"ssh:[^ ,]\+' | xargs -L1 echo | |
Run ./bitbucketclone.py --h to see the options | |
$ ./bitbucketclone.py nkmathew -ssh -private | |
Password for [email protected]: ************* | |
### CHANGELOG | |
+ Jul 19, 2017 | |
- Fix failing ssh clone in Linux | |
- Don't prompt for username again if private options is specified (redundant) | |
""" | |
import json | |
import base64 | |
import os | |
import subprocess | |
import sys | |
import time | |
import urllib.request | |
import colorama as color | |
import argparse | |
import collections | |
from pprint import pprint | |
ENDPOINT = 'https://api.bitbucket.org/2.0/repositories/{}?pagelen=100' | |
__version__ = '0.1.0' | |
def brighten(msg): | |
""" | |
Make text brighter in terminal | |
""" | |
return color.Style.BRIGHT + msg + color.Style.NORMAL | |
def print_y(msg): | |
""" | |
Print information message in yellow | |
""" | |
print(brighten(color.Style.BRIGHT + color.Fore.YELLOW + msg + color.Fore.WHITE)) | |
def print_r(msg): | |
""" | |
Print error message in red | |
""" | |
print(brighten(color.Style.BRIGHT + color.Fore.RED + msg + color.Fore.WHITE)) | |
def download_json(url, username='', password=''): | |
""" | |
Fetches json from the passed url | |
""" | |
request = urllib.request.Request(url) | |
if username and password: | |
header = '{}:{}'.format(username, password).encode() | |
base64string = base64.b64encode(header) | |
header = 'Basic {}'.format(base64string.decode()) | |
request.add_header('Authorization', header) | |
response = urllib.request.urlopen(request).read() | |
response = json.loads(response.decode()) | |
return response | |
def desc_elapsed_time(start_time): | |
""" | |
Describes elapsed time from upto now in English | |
""" | |
elapsed = int(time.time() - start_time) | |
hours = elapsed // 3600 | |
elapsed = elapsed - hours * 3600 | |
hours, mins, secs = (hours, elapsed // 60, elapsed % 60) | |
msg = 'Elapsed time: ' | |
if hours: | |
msg += '{0} Hours {1} Minute(s) {2} seconds'.format(hours, mins, secs) | |
elif mins: | |
msg += '{0} Minute(s) {1} seconds'.format(mins, secs) | |
else: | |
msg += '{0} seconds'.format(secs) | |
return msg | |
def create_args_parser(): | |
""" Returns command line parser """ | |
parser = argparse.ArgumentParser( | |
description="A script for cloning all repos from Bitbucket accounts", | |
prog='bitbucketclone.py') | |
parser.add_argument('username', help='Bitbucket username', type=str, nargs='?', | |
default='nkmathew') | |
parser.add_argument( | |
'-v', '--version', action='version', | |
help='Prints script version', version='[[PROGRAM_NAME]] v%s' % __version__) | |
parser.add_argument('-private', '--private', | |
help='Whether to clone private repos too(requires ' | |
'authentication)', action='store_true') | |
parser.add_argument('-ssh', '--ssh', help='Clone through ssh', dest='use_ssh', | |
action='store_true') | |
parser.add_argument('-dest', '--dest', dest='dest_folder', type=str, | |
default='bitbucket-repos', help='Destination folder') | |
parser.add_argument('-depth', '--depth', help='Clone depth', dest='clone_depth', | |
type=int, default=-1) | |
return parser | |
def parse_options(arguments=None): | |
""" Reads command-line arguments | |
""" | |
if arguments is None: | |
arguments = sys.argv[1:] | |
if isinstance(arguments, str): | |
arguments = arguments.split() | |
if isinstance(arguments, argparse.Namespace): | |
return arguments | |
parser = create_args_parser() | |
args = parser.parse_args(arguments) | |
return args | |
def get_repos(username, private): | |
""" | |
Fetches the repo list and trims down the information | |
""" | |
endpoint = ENDPOINT.format(username) | |
repositories = [] | |
if private: | |
import getpass | |
prompt = '\nPassword for {}@bitbucket.org: '.format(username) | |
password = getpass.getpass(prompt) | |
repositories = download_json(endpoint, username, password)['values'] | |
else: | |
repositories = download_json(endpoint)['values'] | |
repo_list = [] | |
for repo in repositories: | |
info = { | |
'created_on': repo['created_on'], | |
'full_name': repo['full_name'], | |
'https': repo['links']['clone'][0]['href'], | |
'is_private': repo['is_private'], | |
'name': repo['name'], | |
'scm': repo['scm'], | |
'size': repo['size'], | |
'ssh': repo['links']['clone'][1]['href'], | |
'updated_on': repo['updated_on'], | |
} | |
repo_list += [info] | |
repo_list = sorted(repo_list, key=lambda k: k['size']) | |
return repo_list | |
def clone(dest_folder, username, use_ssh, private, depth): | |
""" | |
Does actual cloning of the repos | |
""" | |
repo_list = get_repos(username, private) | |
if not os.path.exists(dest_folder): | |
os.mkdir(dest_folder) | |
cwd = os.getcwd() | |
dest_folder = os.path.relpath(dest_folder) | |
private_count = 0 | |
public_count = 0 | |
private_total = len([x for x in repo_list if x['is_private']]) | |
public_total = len([x for x in repo_list if not x['is_private']]) | |
for repo in repo_list: | |
sys.stderr.write('\n') | |
clone_url = repo['https'] | |
repo_name = repo['name'] | |
repo_name = os.path.join(dest_folder, repo_name).replace('\\', '/') | |
clone_tool = repo['scm'] | |
if use_ssh: | |
clone_url = repo['ssh'] | |
clone_args = [clone_tool, 'clone', clone_url, repo_name] | |
if clone_tool == 'git': | |
clone_args += ['--recursive'] | |
if depth and depth != -1: | |
clone_args += ['--depth={}'.format(depth)] | |
elif clone_tool == 'hg': | |
clone_args += ['--verbose'] | |
if not os.path.exists(repo_name): | |
if repo['is_private']: | |
private_count += 1 | |
info = 'Private: {} of {}'.format(private_count, private_total) | |
else: | |
public_count += 1 | |
info = 'Public: {} of {}'.format(public_count, public_total) | |
if repo['size'] < 2**20: | |
repo_size = ' | {:.1f} KB'.format(repo['size'] / 2**10) | |
else: | |
repo_size = ' | {:.1f} MB'.format(repo['size'] / 2**20) | |
print_y(info + repo_size) | |
subprocess.call(clone_args) | |
return public_count, private_count | |
def start_cloning(options=None): | |
""" | |
Driver for cloning process | |
""" | |
opts = parse_options(options) | |
username = opts.username | |
use_ssh = opts.use_ssh | |
dest_folder = opts.dest_folder | |
clone_depth = opts.clone_depth | |
private = opts.private | |
counts = clone(dest_folder, username, use_ssh, private, clone_depth) | |
return counts | |
def main(): | |
""" | |
Entry point | |
""" | |
opts = parse_options() | |
color.init() | |
start = time.time() | |
counts = start_cloning(opts) | |
print_y('\nCloned {0} public and {1} private repos...'.format(*counts)) | |
print_y(desc_elapsed_time(start)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment