-
-
Save shakir915/d45feb2378be9c0584372979a0d0eaf6 to your computer and use it in GitHub Desktop.
bitbucket to github batch migrartion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import requests | |
| import subprocess | |
| import os | |
| import json | |
| import time | |
| from urllib.parse import urlparse, quote | |
| # Configuration | |
| BITBUCKET_USERNAME = "shakir_m" | |
| BITBUCKET_APP_PASSWORD = "ATBB2AK9ayjP4cUtR2BYHwQY3JfD859A4E4E" | |
| GITHUB_USERNAME = "shakir915" | |
| GITHUB_TOKEN = "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" | |
| WORKSPACE = "emstell" | |
| # File to track failed repositories | |
| FAILED_REPOS_FILE = "failed_repos.json" | |
| def get_bitbucket_repos(): | |
| """Fetch all repositories from Bitbucket workspace""" | |
| url = f"https://api.bitbucket.org/2.0/repositories/{WORKSPACE}" | |
| repos = [] | |
| while url: | |
| response = requests.get(url, auth=(BITBUCKET_USERNAME, BITBUCKET_APP_PASSWORD)) | |
| if response.status_code != 200: | |
| print(f"Error fetching repos: {response.status_code}") | |
| print(f"Response: {response.text}") | |
| break | |
| data = response.json() | |
| for repo in data.get('values', []): | |
| # Get the actual clone URL from the API response | |
| clone_url = None | |
| for link in repo.get('links', {}).get('clone', []): | |
| if link.get('name') == 'https': | |
| clone_url = link.get('href') | |
| break | |
| # Fallback: build URL manually if not found in API response | |
| if not clone_url: | |
| repo_name = repo['name'] | |
| encoded_repo_name = quote(repo_name, safe='') | |
| clone_url = f"https://bitbucket.org/{WORKSPACE}/{encoded_repo_name}.git" | |
| repos.append({ | |
| 'name': repo['name'], | |
| 'clone_url': clone_url, | |
| 'description': repo.get('description', ''), | |
| 'private': repo.get('is_private', True) | |
| }) | |
| url = data.get('next') | |
| return repos | |
| def save_failed_repos(failed_repos): | |
| """Save failed repositories to a file""" | |
| with open(FAILED_REPOS_FILE, 'w') as f: | |
| json.dump(failed_repos, f, indent=2) | |
| def load_failed_repos(): | |
| """Load failed repositories from file""" | |
| if os.path.exists(FAILED_REPOS_FILE): | |
| with open(FAILED_REPOS_FILE, 'r') as f: | |
| return json.load(f) | |
| return [] | |
| def create_github_repo(repo_name, description="", private=True): | |
| """Create a new private repository on GitHub with user confirmation for naming""" | |
| url = "https://api.github.com/user/repos" | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| # Replace spaces and special chars in GitHub repo name | |
| github_safe_name = repo_name.replace(' ', '-').replace('(', '').replace(')', '').replace('[', '').replace(']', '') | |
| # Try original safe name first | |
| data = { | |
| "name": github_safe_name, | |
| "description": description, | |
| "private": private | |
| } | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 201: | |
| return github_safe_name # Return the successful name | |
| elif response.status_code == 422: # Repository already exists | |
| print(f" β οΈ Repository '{github_safe_name}' already exists on GitHub") | |
| # Ask user what to do | |
| while True: | |
| choice = input(f" Choose action:\n" | |
| f" 1. Use new name with suffix\n" | |
| f" 2. Skip this repository\n" | |
| f" 3. Enter custom name\n" | |
| f" Choice (1/2/3): ").strip() | |
| if choice == '1': | |
| # Try with suffixes | |
| for suffix in ["-migrated", "-bitbucket", "-backup", f"-{int(time.time())}"]: | |
| new_name = f"{github_safe_name}{suffix}" | |
| data["name"] = new_name | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 201: | |
| print(f" β Repository created with name: {new_name}") | |
| return new_name | |
| elif response.status_code == 422: | |
| continue # Try next suffix | |
| else: | |
| print(f" β GitHub API error: {response.status_code}") | |
| return None | |
| print(f" β Could not create repository with any suffix") | |
| return None | |
| elif choice == '2': | |
| print(f" βοΈ Skipping repository: {repo_name}") | |
| return "SKIP" | |
| elif choice == '3': | |
| custom_name = input(f" Enter custom name for '{repo_name}': ").strip() | |
| if not custom_name: | |
| print(" β Invalid name. Please try again.") | |
| continue | |
| # Clean custom name | |
| custom_name = custom_name.replace(' ', '-').replace('(', '').replace(')', '').replace('[', '').replace(']', '') | |
| data["name"] = custom_name | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 201: | |
| print(f" β Repository created with custom name: {custom_name}") | |
| return custom_name | |
| elif response.status_code == 422: | |
| print(f" β Custom name '{custom_name}' also exists. Please try again.") | |
| continue | |
| else: | |
| print(f" β GitHub API error: {response.status_code}") | |
| return None | |
| else: | |
| print(" β Invalid choice. Please enter 1, 2, or 3.") | |
| else: | |
| print(f" β GitHub API error: {response.status_code} - {response.text}") | |
| return None | |
| def migrate_repository(repo): | |
| """Migrate a single repository""" | |
| repo_name = repo['name'] | |
| clone_url = repo['clone_url'] | |
| print(f"π¦ Processing: {repo_name}") | |
| # Clean the clone URL and add proper authentication | |
| if '@bitbucket.org' in clone_url: | |
| # Remove any existing authentication from the URL | |
| clean_url = clone_url.split('@bitbucket.org')[1] # Get everything after @bitbucket.org | |
| auth_url = f"https://{BITBUCKET_USERNAME}:{BITBUCKET_APP_PASSWORD}@bitbucket.org{clean_url}" | |
| elif 'bitbucket.org' in clone_url: | |
| # URL without existing auth | |
| auth_url = clone_url.replace('https://', f'https://{BITBUCKET_USERNAME}:{BITBUCKET_APP_PASSWORD}@') | |
| else: | |
| # Fallback: build authenticated URL manually | |
| encoded_repo_name = quote(repo_name, safe='') | |
| auth_url = f"https://{BITBUCKET_USERNAME}:{BITBUCKET_APP_PASSWORD}@bitbucket.org/{WORKSPACE}/{encoded_repo_name}.git" | |
| # Create a safe directory name (replace spaces and special chars) | |
| safe_dir_name = repo_name.replace(' ', '_').replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace('-', '_') | |
| print(f" π Original URL: {clone_url}") | |
| print(f" π Auth URL: {auth_url}") | |
| print(f" π Local dir: {safe_dir_name}.git") | |
| try: | |
| # Step 1: Create GitHub repository (with user interaction for conflicts) | |
| print(f" ποΈ Creating private GitHub repo...") | |
| github_repo_name = create_github_repo(repo_name, repo['description'], repo['private']) | |
| if not github_repo_name: | |
| print(f" β Failed to create GitHub repository") | |
| return False | |
| elif github_repo_name == "SKIP": | |
| print(f" βοΈ Repository skipped by user") | |
| return "SKIP" # Special return value for skipped repos | |
| # Step 2: Clone repository (mirror = all branches/tags) | |
| print(f" π₯ Cloning from Bitbucket...") | |
| result = subprocess.run(['git', 'clone', '--mirror', auth_url, f'{safe_dir_name}.git'], | |
| capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f" β Failed to clone: {result.stderr}") | |
| return False | |
| # Change to repo directory | |
| os.chdir(f'{safe_dir_name}.git') | |
| # Set GitHub remote (use the actual created repo name) | |
| github_url = f"https://{GITHUB_USERNAME}:{GITHUB_TOKEN}@github.com/{GITHUB_USERNAME}/{github_repo_name}.git" | |
| subprocess.run(['git', 'remote', 'set-url', '--push', 'origin', github_url]) | |
| # Step 3: Push all branches and tags to GitHub | |
| print(f" π€ Pushing all branches and tags to GitHub...") | |
| result = subprocess.run(['git', 'push', '--mirror'], capture_output=True, text=True) | |
| push_success = result.returncode == 0 | |
| # Step 4: Always delete local copy after push attempt | |
| os.chdir('..') | |
| print(f" ποΈ Deleting local copy...") | |
| subprocess.run(['rm', '-rf', f'{safe_dir_name}.git']) | |
| if push_success: | |
| if github_repo_name != repo_name.replace(' ', '-').replace('(', '').replace(')', '').replace('[', '').replace(']', ''): | |
| print(f" β Successfully migrated {repo_name} β {github_repo_name}") | |
| else: | |
| print(f" β Successfully migrated {repo_name}") | |
| return True | |
| else: | |
| print(f" β Failed to push: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| print(f" β Error migrating {repo_name}: {str(e)}") | |
| # Cleanup on error | |
| try: | |
| os.chdir('..') | |
| subprocess.run(['rm', '-rf', f'{safe_dir_name}.git']) | |
| except: | |
| pass | |
| return False | |
| def main(): | |
| print("π Starting bulk migration from Bitbucket to GitHub") | |
| print("=" * 50) | |
| # Check if GitHub credentials are set | |
| if GITHUB_USERNAME == "your_github_username" or GITHUB_TOKEN == "your_github_token": | |
| print("β Please update GITHUB_USERNAME and GITHUB_TOKEN in the script!") | |
| return | |
| # Check if user wants to retry failed repos only | |
| retry_failed = False | |
| if os.path.exists(FAILED_REPOS_FILE): | |
| response = input("Found previous failed repositories. Retry failed only? (y/N): ") | |
| if response.lower() == 'y': | |
| retry_failed = True | |
| # Create working directory | |
| os.makedirs('migration_temp', exist_ok=True) | |
| os.chdir('migration_temp') | |
| if retry_failed: | |
| # Load only failed repositories | |
| print("π Loading previously failed repositories...") | |
| repos = load_failed_repos() | |
| print(f"Found {len(repos)} failed repositories to retry") | |
| else: | |
| # Get all repositories | |
| print("π Fetching Bitbucket repositories...") | |
| repos = get_bitbucket_repos() | |
| print(f"Found {len(repos)} repositories") | |
| if not repos: | |
| print("No repositories found. Check your credentials and workspace name.") | |
| return | |
| # Show repos that will be migrated | |
| print("\nπ Repositories to migrate:") | |
| for i, repo in enumerate(repos, 1): | |
| print(f" {i}. {repo['name']}") | |
| # Ask for confirmation | |
| action = "retry" if retry_failed else "migrate" | |
| response = input(f"\nProceed with {action} of {len(repos)} repositories? (y/N): ") | |
| if response.lower() != 'y': | |
| print(f"{action.capitalize()} cancelled.") | |
| return | |
| # Migrate each repository | |
| successful = 0 | |
| skipped = 0 | |
| failed_repos = [] | |
| print(f"\nπ Starting {action}...") | |
| print("=" * 50) | |
| for repo in repos: | |
| result = migrate_repository(repo) | |
| if result == True: | |
| successful += 1 | |
| elif result == "SKIP": | |
| skipped += 1 | |
| else: | |
| failed_repos.append(repo) | |
| print("-" * 40) | |
| time.sleep(1) # Rate limiting | |
| # Save failed repositories for potential retry | |
| if failed_repos: | |
| save_failed_repos(failed_repos) | |
| print(f"\nπΎ Saved {len(failed_repos)} failed repositories to {FAILED_REPOS_FILE}") | |
| print("Run the script again to retry failed repositories only.") | |
| else: | |
| # Remove failed repos file if all succeeded | |
| if os.path.exists(FAILED_REPOS_FILE): | |
| os.remove(FAILED_REPOS_FILE) | |
| print(f"\nπ Migration Summary:") | |
| print(f"β Successful: {successful}") | |
| print(f"βοΈ Skipped: {skipped}") | |
| print(f"β Failed: {len(failed_repos)}") | |
| print(f"π Total: {len(repos)}") | |
| # List failed repositories with their URLs | |
| if failed_repos: | |
| print(f"\nβ Failed Repositories:") | |
| print("=" * 30) | |
| for i, repo in enumerate(failed_repos, 1): | |
| print(f"{i}. {repo['name']}") | |
| print(f" URL: {repo['clone_url']}") | |
| if repo.get('description'): | |
| print(f" Description: {repo['description']}") | |
| print() | |
| print(f"πΎ Failed repositories saved to: {FAILED_REPOS_FILE}") | |
| print("π‘ Run the script again to retry failed repositories only.") | |
| # Cleanup | |
| os.chdir('..') | |
| subprocess.run(['rm', '-rf', 'migration_temp']) | |
| print("\nπ Migration completed!") | |
| print("Check your GitHub account for the migrated repositories.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment