Created
June 6, 2025 16:25
-
-
Save lukasgabriel/d5eab5340a4a2d77931fee042a3d54a3 to your computer and use it in GitHub Desktop.
GitLab Bulk MR Merge Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
GitLab Bulk MR Merger | |
Searches for MRs with exact title across all projects and merges them with [skip ci]. | |
Supports dry-run mode for safe testing. | |
Safety features: | |
- Exact string matching for MR titles (no fuzzy matching) | |
- Guaranteed skip-ci suffix application to prevent pipeline triggers | |
- Comprehensive dry-run mode with zero side effects | |
- Multiple confirmation prompts before destructive actions | |
Thanks Claude for writing this abomination | |
""" | |
import gitlab | |
import sys | |
import argparse | |
import re | |
from typing import List, Tuple, Optional | |
from dataclasses import dataclass | |
from urllib.parse import urlparse | |
@dataclass | |
class MRInfo: | |
"""Container for MR information""" | |
project_name: str | |
project_id: int | |
mr_iid: int | |
mr_id: int | |
source_branch: str | |
target_branch: str | |
url: str | |
mergeable: bool | |
pipeline_status: str | |
title: str # Store original title for verification | |
def validate_gitlab_url(url: str) -> str: | |
"""Validate and normalize GitLab URL.""" | |
if not url: | |
raise ValueError("GitLab URL cannot be empty") | |
# Add https:// if no scheme provided | |
if not url.startswith(('http://', 'https://')): | |
url = f"https://{url}" | |
try: | |
parsed = urlparse(url) | |
if not parsed.netloc: | |
raise ValueError("Invalid URL format") | |
return url | |
except Exception as e: | |
raise ValueError(f"Invalid GitLab URL: {e}") | |
def validate_token(token: str) -> str: | |
"""Validate GitLab token format.""" | |
if not token: | |
raise ValueError("GitLab token cannot be empty") | |
# GitLab personal access tokens should start with specific prefixes | |
if not (token.startswith('glpat-') or token.startswith('gitlab-ci-token')): | |
print("β οΈ Warning: Token doesn't match expected GitLab format (should start with 'glpat-')") | |
if len(token) < 10: | |
raise ValueError("Token appears too short to be valid") | |
return token.strip() | |
def validate_skip_ci_suffix(suffix: str) -> str: | |
"""Validate and ensure skip CI suffix will work.""" | |
if not suffix: | |
raise ValueError("Skip CI suffix cannot be empty") | |
# Check for common skip CI patterns | |
skip_patterns = [ | |
r'\[skip ci\]', | |
r'\[ci skip\]', | |
r'\[no ci\]', | |
r'\[skip-ci\]', | |
r'\[ci-skip\]' | |
] | |
suffix_lower = suffix.lower().strip() | |
if not any(re.search(pattern, suffix_lower) for pattern in skip_patterns): | |
print(f"β οΈ Warning: Skip CI suffix '{suffix}' may not be recognized by GitLab CI") | |
print(" Common patterns: [skip ci], [ci skip], [no ci]") | |
return suffix | |
def setup_gitlab_connection(url: str, token: str) -> gitlab.Gitlab: | |
"""Initialize and test GitLab connection with validation.""" | |
try: | |
# Validate inputs | |
validated_url = validate_gitlab_url(url) | |
validated_token = validate_token(token) | |
gl = gitlab.Gitlab(validated_url, private_token=validated_token) | |
gl.auth() | |
print(f"β Connected to GitLab as: {gl.user.username}") | |
return gl | |
except gitlab.exceptions.GitlabAuthenticationError: | |
print("β Authentication failed: Invalid token or insufficient permissions") | |
print(" Ensure your token has 'api' scope and is not expired") | |
sys.exit(1) | |
except Exception as e: | |
print(f"β Failed to connect to GitLab: {e}") | |
sys.exit(1) | |
def exact_title_match(mr_title: str, search_title: str) -> bool: | |
""" | |
Perform exact title matching with additional safety checks. | |
This function ensures EXACT matching to prevent accidental merges. | |
No fuzzy matching, no partial matching, no case-insensitive matching. | |
""" | |
# Strip whitespace but maintain exact matching | |
mr_title_clean = mr_title.strip() | |
search_title_clean = search_title.strip() | |
# Must be exactly equal | |
exact_match = mr_title_clean == search_title_clean | |
if exact_match: | |
# Additional safety: ensure titles are meaningful (not just whitespace) | |
if len(search_title_clean) < 3: | |
print(f"β οΈ Warning: Search title is very short: '{search_title_clean}'") | |
return False | |
# Ensure we're not matching generic titles that could be dangerous | |
dangerous_patterns = [ | |
'merge', | |
'update', | |
'fix', | |
'chore', | |
'feat', | |
'test' | |
] | |
if search_title_clean.lower() in dangerous_patterns: | |
print(f"β Refusing to match generic title: '{search_title_clean}'") | |
print(" Please use a more specific title to avoid accidental merges") | |
return False | |
return exact_match | |
def find_matching_mrs(gl: gitlab.Gitlab, search_title: str, dry_run: bool = False, target_groups: List[str] = None) -> List[MRInfo]: | |
"""Find all MRs with the exact title across all accessible projects or specified groups.""" | |
matching_mrs = [] | |
processed_projects = 0 | |
print(f"π Searching for MRs with EXACT title: '{search_title}'") | |
print("π‘οΈ Using strict exact matching - no fuzzy or partial matches") | |
if dry_run: | |
print("π§ͺ DRY RUN MODE - No changes will be made") | |
try: | |
if target_groups: | |
# Search within specified groups only | |
projects = [] | |
print(f"π― Filtering by groups: {', '.join(target_groups)}") | |
for group_path in target_groups: | |
try: | |
print(f"π‘ Fetching projects from group: {group_path}") | |
group = gl.groups.get(group_path) | |
# Get all projects in the group (including subgroups) with min access level | |
# This server-side filtering reduces the number of projects we need to check | |
group_projects = group.projects.list( | |
all=True, | |
include_subgroups=True, | |
min_access_level=30 # Developer level or higher | |
) | |
print(f"π Found {len(group_projects)} accessible projects in group {group_path}") | |
# Convert project data to full project objects efficiently | |
for project_data in group_projects: | |
try: | |
# For group projects with min_access_level filter, we already know we have access | |
# Get the full project object so we can access its attributes | |
project = gl.projects.get(project_data.id) | |
projects.append(project) | |
except gitlab.exceptions.GitlabAuthenticationError: | |
# Skip projects we don't have access to | |
continue | |
except Exception as e: | |
print(f"β οΈ Error accessing project {project_data.name}: {e}") | |
continue | |
except gitlab.exceptions.GitlabGetError as e: | |
print(f"β Group '{group_path}' not found or not accessible: {e}") | |
continue | |
except Exception as e: | |
print(f"β Error accessing group '{group_path}': {e}") | |
continue | |
print(f"π Total accessible projects in specified groups: {len(projects)}") | |
else: | |
# Original behavior: get all projects the user has access to | |
print("π‘ Fetching all accessible projects...") | |
projects = gl.projects.list(all=True, membership=True, min_access_level=30) # Developer level+ | |
print(f"π Found {len(projects)} accessible projects") | |
for project in projects: | |
processed_projects += 1 | |
# Show progress for large numbers of projects | |
if processed_projects % 20 == 0: | |
print(f"β³ Processed {processed_projects}/{len(projects)} projects...") | |
try: | |
# Get open MRs for this project | |
mrs = project.mergerequests.list(state='opened', all=True) | |
for mr in mrs: | |
# Use exact matching function for safety | |
if exact_title_match(mr.title, search_title): | |
# Get detailed MR info | |
mr_detail = project.mergerequests.get(mr.iid) | |
mr_info = MRInfo( | |
project_name=project.name_with_namespace, | |
project_id=project.id, | |
mr_iid=mr.iid, | |
mr_id=mr.id, | |
source_branch=mr.source_branch, | |
target_branch=mr.target_branch, | |
url=mr.web_url, | |
mergeable=getattr(mr_detail, 'merge_status', 'unknown') == 'can_be_merged', | |
pipeline_status=getattr(mr_detail, 'pipeline', {}).get('status', 'unknown') if hasattr(mr_detail, 'pipeline') and mr_detail.pipeline else 'no_pipeline', | |
title=mr.title # Store for verification | |
) | |
matching_mrs.append(mr_info) | |
status_icon = "β " if mr_info.mergeable else "β οΈ" | |
print(f"{status_icon} Found: {project.name_with_namespace} - !{mr.iid} (mergeable: {mr_info.mergeable})") | |
except gitlab.exceptions.GitlabAuthenticationError: | |
print(f"π Access denied to project: {project.name_with_namespace}") | |
continue | |
except Exception as e: | |
print(f"β οΈ Error processing project {project.name_with_namespace}: {e}") | |
continue | |
except Exception as e: | |
print(f"β Error fetching projects: {e}") | |
sys.exit(1) | |
print(f"β Completed search across {processed_projects} projects") | |
return matching_mrs | |
def display_mrs_summary(mrs: List[MRInfo], search_title: str) -> None: | |
"""Display a summary of found MRs with title verification.""" | |
if not mrs: | |
print("π No matching MRs found.") | |
return | |
print(f"\nπ Found {len(mrs)} matching MRs:") | |
print("=" * 120) | |
mergeable_count = sum(1 for mr in mrs if mr.mergeable) | |
non_mergeable_count = len(mrs) - mergeable_count | |
# Verify all found MRs still match exactly (safety check) | |
title_mismatch_count = 0 | |
for mr in mrs: | |
if not exact_title_match(mr.title, search_title): | |
title_mismatch_count += 1 | |
print(f"β TITLE MISMATCH: {mr.project_name} - !{mr.mr_iid} has title: '{mr.title}'") | |
if title_mismatch_count > 0: | |
print(f"π¨ CRITICAL: {title_mismatch_count} MRs have title mismatches - ABORTING") | |
sys.exit(1) | |
for mr in mrs: | |
status = "β MERGEABLE" if mr.mergeable else "β οΈ NOT MERGEABLE" | |
pipeline = f"(Pipeline: {mr.pipeline_status})" if mr.pipeline_status != 'no_pipeline' else "(No pipeline)" | |
print(f"{status:<15} | {mr.project_name:<50} | !{mr.mr_iid:<6} | {pipeline}") | |
print("=" * 120) | |
print(f"π Summary: {mergeable_count} mergeable, {non_mergeable_count} not mergeable") | |
print(f"π‘οΈ All {len(mrs)} MRs have verified exact title match: '{search_title}'") | |
if non_mergeable_count > 0: | |
print("β οΈ Warning: Some MRs cannot be merged (conflicts, failed pipelines, etc.)") | |
def create_merge_commit_message(mr_info: MRInfo, skip_ci_suffix: str) -> str: | |
"""Create the merge commit message with guaranteed [skip ci] suffix.""" | |
# Validate skip CI suffix before using it | |
validated_suffix = validate_skip_ci_suffix(skip_ci_suffix) | |
default_message = f"Merge branch '{mr_info.source_branch}' into '{mr_info.target_branch}'" | |
final_message = default_message + validated_suffix | |
# Double-check that skip CI is in the final message | |
if not any(pattern in final_message.lower() for pattern in ['skip ci', 'ci skip', 'no ci']): | |
raise ValueError(f"Skip CI suffix validation failed. Final message: '{final_message}'") | |
return final_message | |
def merge_single_mr(gl: gitlab.Gitlab, mr_info: MRInfo, skip_ci_suffix: str, search_title: str, dry_run: bool = False) -> bool: | |
"""Merge a single MR or simulate the merge in dry-run mode with comprehensive safety checks.""" | |
try: | |
# CRITICAL: Verify title still matches before any action | |
if not exact_title_match(mr_info.title, search_title): | |
print(f"β SAFETY CHECK FAILED: MR title has changed!") | |
print(f" Expected: '{search_title}'") | |
print(f" Found: '{mr_info.title}'") | |
print(f" Skipping {mr_info.project_name} - !{mr_info.mr_iid}") | |
return False | |
# Get fresh MR data to double-check state | |
project = gl.projects.get(mr_info.project_id) | |
mr = project.mergerequests.get(mr_info.mr_iid) | |
# Additional safety checks | |
if mr.state != 'opened': | |
print(f"β SAFETY CHECK: MR is not open (state: {mr.state})") | |
return False | |
# Final title verification from fresh data | |
if not exact_title_match(mr.title, search_title): | |
print(f"β FINAL SAFETY CHECK FAILED: Fresh MR title doesn't match!") | |
print(f" Expected: '{search_title}'") | |
print(f" Fresh title: '{mr.title}'") | |
return False | |
merge_message = create_merge_commit_message(mr_info, skip_ci_suffix) | |
if dry_run: | |
print(f"π§ͺ DRY RUN - Would merge: {mr_info.project_name} - !{mr_info.mr_iid}") | |
print(f" β Title verified: '{mr.title}'") | |
print(f" β Commit message: {merge_message}") | |
print(f" β Source branch: {mr_info.source_branch} -> {mr_info.target_branch}") | |
print(f" π‘οΈ Skip CI suffix confirmed in message") | |
return True | |
else: | |
# Final confirmation that this is not a dry run | |
print(f"π LIVE MERGE: {mr_info.project_name} - !{mr_info.mr_iid}") | |
print(f" Title: '{mr.title}'") | |
print(f" Commit message: {merge_message}") | |
# Actually merge the MR | |
mr.merge( | |
merge_commit_message=merge_message, | |
should_remove_source_branch=True, # Clean up source branch | |
merge_when_pipeline_succeeds=False, | |
squash=False # Preserve commit history | |
) | |
print(f"β MERGED: {mr_info.project_name} - !{mr_info.mr_iid}") | |
print(f" π‘οΈ Skip CI applied: {skip_ci_suffix}") | |
return True | |
except gitlab.exceptions.GitlabMRForbiddenError: | |
print(f"β FORBIDDEN: Cannot merge {mr_info.project_name} - !{mr_info.mr_iid} (insufficient permissions)") | |
return False | |
except gitlab.exceptions.GitlabMRConflictError: | |
print(f"β CONFLICT: Cannot merge {mr_info.project_name} - !{mr_info.mr_iid} (merge conflicts or not mergeable)") | |
return False | |
except Exception as e: | |
print(f"β ERROR: Failed to merge {mr_info.project_name} - !{mr_info.mr_iid}: {e}") | |
return False | |
def bulk_merge_mrs(gl: gitlab.Gitlab, mrs: List[MRInfo], skip_ci_suffix: str, search_title: str, dry_run: bool = False) -> Tuple[int, int]: | |
"""Merge all MRs in the list with comprehensive safety checks.""" | |
if not mrs: | |
return 0, 0 | |
# Validate skip CI suffix before proceeding | |
try: | |
validated_suffix = validate_skip_ci_suffix(skip_ci_suffix) | |
except ValueError as e: | |
print(f"β Skip CI suffix validation failed: {e}") | |
return 0, 0 | |
# Filter out non-mergeable MRs | |
mergeable_mrs = [mr for mr in mrs if mr.mergeable] | |
non_mergeable_mrs = [mr for mr in mrs if not mr.mergeable] | |
if non_mergeable_mrs: | |
print(f"\nβ οΈ Skipping {len(non_mergeable_mrs)} non-mergeable MRs:") | |
for mr in non_mergeable_mrs: | |
print(f" - {mr.project_name} - !{mr.mr_iid}") | |
if not mergeable_mrs: | |
print("β No mergeable MRs found.") | |
return 0, 0 | |
print(f"\nπ {'DRY RUN: Would merge' if dry_run else 'LIVE MERGE MODE'} {len(mergeable_mrs)} MRs...") | |
print(f"π‘οΈ Skip CI suffix: '{validated_suffix}'") | |
print(f"π Target title: '{search_title}'") | |
# Multiple confirmation prompts for live mode | |
if not dry_run: | |
print("\n" + "="*60) | |
print("β οΈ β οΈ β οΈ LIVE MERGE CONFIRMATION β οΈ β οΈ β οΈ") | |
print("="*60) | |
print(f"You are about to merge {len(mergeable_mrs)} MRs") | |
print(f"All MRs have title: '{search_title}'") | |
print(f"Skip CI suffix: '{validated_suffix}'") | |
print("="*60) | |
# First confirmation | |
confirm1 = input("Type 'MERGE' to confirm you want to proceed: ") | |
if confirm1 != 'MERGE': | |
print("β Merge cancelled by user.") | |
return 0, 0 | |
# Second confirmation with count | |
confirm2 = input(f"Type '{len(mergeable_mrs)}' to confirm the number of MRs to merge: ") | |
if confirm2 != str(len(mergeable_mrs)): | |
print("β Merge cancelled - number mismatch.") | |
return 0, 0 | |
print("β Confirmations received - proceeding with merge...") | |
else: | |
print("π§ͺ DRY RUN - No confirmations needed") | |
successful = 0 | |
failed = 0 | |
for i, mr_info in enumerate(mergeable_mrs, 1): | |
print(f"\n[{i}/{len(mergeable_mrs)}] Processing {mr_info.project_name} - !{mr_info.mr_iid}") | |
if merge_single_mr(gl, mr_info, validated_suffix, search_title, dry_run): | |
successful += 1 | |
else: | |
failed += 1 | |
return successful, failed | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Bulk merge GitLab MRs with specific title across multiple projects", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
SAFETY FEATURES: | |
- Exact title matching only (no fuzzy/partial matching) | |
- Multiple confirmation prompts before live merges | |
- Guaranteed skip-ci suffix application | |
- Comprehensive dry-run mode with zero side effects | |
- Input validation for all parameters | |
Examples: | |
# DRY RUN (recommended first step) | |
python gitlab_bulk_merge.py --url https://gitlab.example.com --token glpat-xxx --title "chore(deps): update dependency" --dry-run | |
# Live merge with group filtering | |
python gitlab_bulk_merge.py --url https://gitlab.example.com --token glpat-xxx --title "chore(deps): update dependency" --groups mygroup | |
# Custom skip CI suffix | |
python gitlab_bulk_merge.py --url https://gitlab.example.com --token glpat-xxx --title "chore(deps): update dependency" --skip-ci-suffix " [ci skip]" | |
""" | |
) | |
parser.add_argument( | |
'--url', | |
required=True, | |
help='GitLab instance URL (e.g., https://gitlab.example.com)' | |
) | |
parser.add_argument( | |
'--token', | |
required=True, | |
help='GitLab personal access token (with api scope)' | |
) | |
parser.add_argument( | |
'--dry-run', | |
action='store_true', | |
help='Show what would be merged without actually merging (RECOMMENDED for first run)' | |
) | |
parser.add_argument( | |
'--title', | |
required=True, | |
help='MR title to search for (exact match only - be specific to avoid accidents)' | |
) | |
parser.add_argument( | |
'--groups', | |
nargs='+', | |
help='Limit search to specific GitLab groups (e.g. mygroup mygroup/subgroup). If not specified, searches all accessible projects.' | |
) | |
parser.add_argument( | |
'--skip-ci-suffix', | |
default=' [skip ci]', | |
help='Suffix to add to merge commit message (default: " [skip ci]")' | |
) | |
args = parser.parse_args() | |
# Comprehensive input validation | |
try: | |
# Validate URL | |
validated_url = validate_gitlab_url(args.url) | |
# Validate token | |
validated_token = validate_token(args.token) | |
# Validate title | |
if not args.title or len(args.title.strip()) < 3: | |
print("β Error: Title must be at least 3 characters long") | |
sys.exit(1) | |
validated_title = args.title.strip() | |
# Check for dangerous generic titles | |
if validated_title.lower() in ['merge', 'update', 'fix', 'chore', 'feat', 'test']: | |
print(f"β Error: Title '{validated_title}' is too generic and could match unintended MRs") | |
print(" Please use a more specific title") | |
sys.exit(1) | |
# Validate skip CI suffix | |
validated_skip_ci = validate_skip_ci_suffix(args.skip_ci_suffix) | |
# Validate groups if provided | |
if args.groups: | |
for group in args.groups: | |
if not group.strip(): | |
print("β Error: Empty group name provided") | |
sys.exit(1) | |
if len(group.strip()) < 2: | |
print(f"β Error: Group name '{group}' is too short") | |
sys.exit(1) | |
except ValueError as e: | |
print(f"β Validation error: {e}") | |
sys.exit(1) | |
print("π§ GitLab Bulk MR Merger") | |
print("=" * 60) | |
print(f"GitLab URL: {validated_url}") | |
print(f"Search title: '{validated_title}'") | |
if args.groups: | |
print(f"Target groups: {', '.join(args.groups)}") | |
else: | |
print("Target groups: All accessible projects") | |
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE MERGE'}") | |
print(f"Skip CI suffix: '{validated_skip_ci}'") | |
print("=" * 60) | |
# Warn about live mode | |
if not args.dry_run: | |
print("β οΈ β οΈ β οΈ WARNING: LIVE MERGE MODE β οΈ β οΈ β οΈ") | |
print("This will actually merge MRs. Consider using --dry-run first!") | |
print("=" * 60) | |
# Initialize GitLab connection | |
gl = setup_gitlab_connection(validated_url, validated_token) | |
# Find matching MRs | |
matching_mrs = find_matching_mrs(gl, validated_title, args.dry_run, args.groups) | |
# Display summary with validation | |
display_mrs_summary(matching_mrs, validated_title) | |
if not matching_mrs: | |
print("β No MRs found - nothing to do") | |
return | |
# Final safety check: ensure we have exact matches | |
for mr in matching_mrs: | |
if not exact_title_match(mr.title, validated_title): | |
print(f"π¨ CRITICAL ERROR: MR {mr.project_name} - !{mr.mr_iid} has mismatched title") | |
print(f" Expected: '{validated_title}'") | |
print(f" Found: '{mr.title}'") | |
sys.exit(1) | |
# Perform merge (or dry-run) | |
successful, failed = bulk_merge_mrs(gl, matching_mrs, validated_skip_ci, validated_title, args.dry_run) | |
# Final summary | |
print(f"\nπ Final Summary:") | |
print(f"{'DRY RUN - Would have merged' if args.dry_run else 'Successfully merged'}: {successful}") | |
if failed > 0: | |
print(f"β Failed: {failed}") | |
print(f"π Commit message suffix: '{validated_skip_ci}'") | |
print(f"π‘οΈ All operations used exact title matching: '{validated_title}'") | |
if args.dry_run and successful > 0: | |
print(f"\nπ‘ To perform the actual merge, run the same command without --dry-run") | |
print(f" You will be prompted for confirmation before any changes are made.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment