Last active
March 2, 2026 20:09
-
-
Save YuriyGuts/c3808d1e64fa95931726 to your computer and use it in GitHub Desktop.
Pull latest changes for all Git repositories in a directory (fast-forward only).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Pull latest changes for all Git repositories in a directory (fast-forward only). | |
| Uses only the Python standard library (no pip dependencies required). | |
| Usage: git-update-all [-h] [--dry-run] [--parallel N] [--quiet] [--verbose] [--version] [directory] | |
| positional arguments: | |
| directory Directory containing repositories (default: current directory) | |
| options: | |
| -h, --help show this help message and exit | |
| --dry-run, -n Show what would be pulled without pulling | |
| --parallel, -j N Number of parallel pull operations (default: 1) | |
| --quiet, -q Suppress non-error output | |
| --verbose, -v Show detailed output | |
| --version show program's version number and exit | |
| Examples: | |
| git-update-all Pull all repos in current directory | |
| git-update-all ~/projects Pull all repos in ~/projects | |
| git-update-all -j4 Pull in parallel with 4 workers | |
| git-update-all --dry-run Show what would be pulled | |
| """ | |
| import argparse | |
| import os | |
| import subprocess | |
| import sys | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from concurrent.futures import as_completed | |
| from dataclasses import dataclass | |
| from typing import List | |
| VERSION = "1.0.0" | |
| PULL_TIMEOUT = 300 # 5 minutes per repo | |
| class Colors: | |
| """ANSI color codes with TTY detection.""" | |
| def __init__(self): | |
| self._enabled = self._should_use_colors() | |
| if self._enabled and sys.platform == "win32": | |
| self._enabled = _init_windows_ansi() | |
| def _should_use_colors(self) -> bool: | |
| """Check if colors should be used.""" | |
| if os.environ.get("NO_COLOR"): | |
| return False | |
| if not sys.stdout.isatty(): | |
| return False | |
| return True | |
| def _code(self, code: str) -> str: | |
| """Return ANSI code if colors enabled, empty string otherwise.""" | |
| return code if self._enabled else "" | |
| @property | |
| def reset(self) -> str: | |
| return self._code("\033[0m") | |
| @property | |
| def bold(self) -> str: | |
| return self._code("\033[1m") | |
| @property | |
| def red(self) -> str: | |
| return self._code("\033[91m") | |
| @property | |
| def green(self) -> str: | |
| return self._code("\033[92m") | |
| @property | |
| def yellow(self) -> str: | |
| return self._code("\033[93m") | |
| @property | |
| def blue(self) -> str: | |
| return self._code("\033[94m") | |
| @property | |
| def cyan(self) -> str: | |
| return self._code("\033[96m") | |
| @property | |
| def dim(self) -> str: | |
| return self._code("\033[2m") | |
| def _init_windows_ansi() -> bool: | |
| """Enable ANSI escape codes on Windows 10+.""" | |
| try: | |
| import ctypes | |
| kernel32 = ctypes.windll.kernel32 | |
| handle = kernel32.GetStdHandle(-11) | |
| mode = ctypes.c_ulong() | |
| kernel32.GetConsoleMode(handle, ctypes.byref(mode)) | |
| ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 | |
| kernel32.SetConsoleMode(handle, mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) | |
| return True | |
| except Exception: | |
| return False | |
| # Global colors instance | |
| colors = Colors() | |
| @dataclass | |
| class PullResult: | |
| """Result of a pull operation.""" | |
| repo_name: str | |
| success: bool | |
| message: str | |
| duration: float | |
| skipped: bool = False | |
| updated: bool = False | |
| class RepoPuller: | |
| """Handles pulling repositories.""" | |
| def __init__( | |
| self, | |
| base_dir: str, | |
| dry_run: bool = False, | |
| quiet: bool = False, | |
| verbose: bool = False, | |
| parallel: bool = False, | |
| ): | |
| self.base_dir = base_dir | |
| self.dry_run = dry_run | |
| self.quiet = quiet | |
| self.verbose = verbose | |
| self.parallel = parallel | |
| def pull_repo(self, repo_path: str) -> PullResult: | |
| """Pull latest changes for a single repository.""" | |
| start_time = time.time() | |
| repo_name = os.path.basename(repo_path) | |
| # Check if it's a git repo | |
| git_dir = os.path.join(repo_path, ".git") | |
| if not os.path.exists(git_dir): | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=True, | |
| message="not a git repo", | |
| duration=time.time() - start_time, | |
| skipped=True, | |
| ) | |
| # Dry run | |
| if self.dry_run: | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=True, | |
| message="would pull", | |
| duration=time.time() - start_time, | |
| skipped=True, | |
| ) | |
| # Disable interactive prompts when running in parallel | |
| env = None | |
| if self.parallel: | |
| env = os.environ.copy() | |
| env["GIT_TERMINAL_PROMPT"] = "0" | |
| env["GIT_SSH_COMMAND"] = "ssh -o BatchMode=yes" | |
| # Fetch tags first | |
| try: | |
| result = subprocess.run( | |
| ["git", "fetch", "--tags"], | |
| cwd=repo_path, | |
| capture_output=True, | |
| text=True, | |
| timeout=PULL_TIMEOUT, | |
| env=env, | |
| ) | |
| if result.returncode != 0: | |
| error_msg = result.stderr.strip() or "fetch tags failed" | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=False, | |
| message=f"fetch tags: {error_msg}", | |
| duration=time.time() - start_time, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=False, | |
| message="fetch tags: timeout", | |
| duration=time.time() - start_time, | |
| ) | |
| # Pull with fast-forward only | |
| try: | |
| result = subprocess.run( | |
| ["git", "pull", "--ff-only"], | |
| cwd=repo_path, | |
| capture_output=True, | |
| text=True, | |
| timeout=PULL_TIMEOUT, | |
| env=env, | |
| ) | |
| if result.returncode == 0: | |
| output = result.stdout.strip() | |
| if "Already up to date" in output or "Already up-to-date" in output: | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=True, | |
| message="up to date", | |
| duration=time.time() - start_time, | |
| updated=False, | |
| ) | |
| else: | |
| subprocess.run( | |
| ["git", "submodule", "update", "--init", "--recursive"], | |
| cwd=repo_path, | |
| capture_output=True, | |
| text=True, | |
| timeout=PULL_TIMEOUT, | |
| env=env, | |
| ) | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=True, | |
| message="updated", | |
| duration=time.time() - start_time, | |
| updated=True, | |
| ) | |
| else: | |
| error_msg = result.stderr.strip() or result.stdout.strip() or "pull failed" | |
| # Clean up common error messages | |
| if "Not possible to fast-forward" in error_msg: | |
| error_msg = "cannot fast-forward (local changes or diverged)" | |
| elif "no tracking information" in error_msg.lower(): | |
| error_msg = "no upstream branch configured" | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=False, | |
| message=error_msg, | |
| duration=time.time() - start_time, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=False, | |
| message="timeout", | |
| duration=time.time() - start_time, | |
| ) | |
| except Exception as e: | |
| return PullResult( | |
| repo_name=repo_name, | |
| success=False, | |
| message=str(e), | |
| duration=time.time() - start_time, | |
| ) | |
| def pull_repos( | |
| self, repo_paths: List[str], workers: int = 1, reporter: "ProgressReporter" = None | |
| ) -> List[PullResult]: | |
| """Pull multiple repositories, optionally in parallel.""" | |
| results = [] | |
| if workers == 1: | |
| # Sequential | |
| for i, repo_path in enumerate(repo_paths): | |
| if reporter: | |
| reporter.start_repo(i + 1, len(repo_paths), os.path.basename(repo_path)) | |
| result = self.pull_repo(repo_path) | |
| results.append(result) | |
| if reporter: | |
| reporter.finish_repo(result) | |
| else: | |
| # Parallel | |
| with ThreadPoolExecutor(max_workers=workers) as executor: | |
| future_to_repo = {} | |
| for i, repo_path in enumerate(repo_paths): | |
| future = executor.submit(self.pull_repo, repo_path) | |
| future_to_repo[future] = (i, repo_path) | |
| completed = 0 | |
| for future in as_completed(future_to_repo): | |
| idx, repo_path = future_to_repo[future] | |
| completed += 1 | |
| result = future.result() | |
| results.append(result) | |
| if reporter: | |
| reporter.start_repo(completed, len(repo_paths), os.path.basename(repo_path)) | |
| reporter.finish_repo(result) | |
| return results | |
| class ProgressReporter: | |
| """Handles progress output.""" | |
| def __init__(self, quiet: bool = False, verbose: bool = False): | |
| self.quiet = quiet | |
| self.verbose = verbose | |
| def header(self, directory: str, count: int): | |
| """Print header.""" | |
| if self.quiet: | |
| return | |
| print( | |
| f"{colors.bold}git-update-all v{VERSION}{colors.reset} - " | |
| f"Updating {colors.bold}{count}{colors.reset} repositories in " | |
| f"'{colors.cyan}{directory}{colors.reset}'" | |
| ) | |
| print() | |
| def start_repo(self, index: int, total: int, repo_name: str): | |
| """Print start of repo pull.""" | |
| if self.quiet: | |
| return | |
| width = len(str(total)) | |
| prefix = f"[{index:>{width}}/{total}]" | |
| print(f"{prefix} {repo_name}... ", end="", flush=True) | |
| def finish_repo(self, result: PullResult): | |
| """Print result of repo pull.""" | |
| if self.quiet: | |
| return | |
| duration_str = f"({result.duration:.1f}s)" if self.verbose else "" | |
| if result.skipped: | |
| print( | |
| f"{colors.yellow}skipped{colors.reset} " | |
| f"{colors.dim}{result.message}{colors.reset} {duration_str}" | |
| ) | |
| elif result.success: | |
| if result.updated: | |
| print(f"{colors.green}updated{colors.reset} {duration_str}") | |
| else: | |
| print(f"{colors.dim}up to date{colors.reset} {duration_str}") | |
| else: | |
| print(f"{colors.red}failed{colors.reset} {colors.dim}{result.message}{colors.reset}") | |
| def summary(self, results: List[PullResult]): | |
| """Print summary.""" | |
| if self.quiet: | |
| return | |
| total = len(results) | |
| updated = sum(1 for r in results if r.success and r.updated) | |
| up_to_date = sum(1 for r in results if r.success and not r.updated and not r.skipped) | |
| skipped = sum(1 for r in results if r.skipped) | |
| failed = sum(1 for r in results if not r.success) | |
| print() | |
| print(f"{colors.bold}Summary:{colors.reset}") | |
| print(f" Total: {total}") | |
| if updated > 0: | |
| print(f" Updated: {colors.green}{updated}{colors.reset}") | |
| if up_to_date > 0: | |
| print(f" Up to date: {up_to_date}") | |
| if skipped > 0: | |
| print(f" Skipped: {colors.yellow}{skipped}{colors.reset}") | |
| if failed > 0: | |
| print(f" Failed: {colors.red}{failed}{colors.reset}") | |
| def error(self, message: str): | |
| """Print error message.""" | |
| print(f"{colors.red}Error:{colors.reset} {message}", file=sys.stderr) | |
| def parse_args() -> argparse.Namespace: | |
| """Parse command-line arguments.""" | |
| parser = argparse.ArgumentParser( | |
| prog="git-update-all", | |
| description="Pull latest changes for all Git repositories in a directory (fast-forward only).", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s Pull all repos in current directory | |
| %(prog)s ~/projects Pull all repos in ~/projects | |
| %(prog)s -j4 Pull in parallel with 4 workers | |
| %(prog)s --dry-run Show what would be pulled | |
| """, | |
| ) | |
| parser.add_argument( | |
| "directory", | |
| nargs="?", | |
| default=".", | |
| help="Directory containing repositories (default: current directory)", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", "-n", action="store_true", help="Show what would be pulled without pulling" | |
| ) | |
| parser.add_argument( | |
| "--parallel", | |
| "-j", | |
| type=int, | |
| default=1, | |
| metavar="N", | |
| help="Number of parallel pull operations (default: 1)", | |
| ) | |
| parser.add_argument("--quiet", "-q", action="store_true", help="Suppress non-error output") | |
| parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output") | |
| parser.add_argument("--version", action="version", version=f"%(prog)s {VERSION}") | |
| return parser.parse_args() | |
| def find_repos(base_dir: str) -> List[str]: | |
| """Find all directories in base_dir (non-recursive).""" | |
| repos = [] | |
| try: | |
| for entry in os.scandir(base_dir): | |
| if entry.is_dir() and not entry.name.startswith("."): | |
| repos.append(entry.path) | |
| except PermissionError: | |
| pass | |
| return sorted(repos) | |
| def check_git_installed() -> bool: | |
| """Check if git is installed and accessible.""" | |
| try: | |
| subprocess.run( | |
| ["git", "--version"], | |
| capture_output=True, | |
| check=True, | |
| ) | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return False | |
| def main() -> int: | |
| """Main entry point.""" | |
| args = parse_args() | |
| reporter = ProgressReporter(quiet=args.quiet, verbose=args.verbose) | |
| # Validate options | |
| if args.quiet and args.verbose: | |
| reporter.error("Cannot use both --quiet and --verbose") | |
| return 3 | |
| # Check git is installed | |
| if not check_git_installed(): | |
| reporter.error("git is not installed or not in PATH") | |
| return 3 | |
| # Resolve directory | |
| base_dir = os.path.abspath(args.directory) | |
| if not os.path.isdir(base_dir): | |
| reporter.error(f"Not a directory: {args.directory}") | |
| return 3 | |
| # Find repositories | |
| repo_paths = find_repos(base_dir) | |
| if not repo_paths: | |
| reporter.error(f"No subdirectories found in {args.directory}") | |
| return 0 | |
| # Print header | |
| reporter.header(args.directory, len(repo_paths)) | |
| # Pull repositories | |
| puller = RepoPuller( | |
| base_dir=base_dir, | |
| dry_run=args.dry_run, | |
| quiet=args.quiet, | |
| verbose=args.verbose, | |
| parallel=args.parallel > 1, | |
| ) | |
| results = puller.pull_repos(repo_paths, workers=args.parallel, reporter=reporter) | |
| # Print summary | |
| reporter.summary(results) | |
| # Determine exit code | |
| failed = sum(1 for r in results if not r.success) | |
| succeeded = sum(1 for r in results if r.success and not r.skipped) | |
| if failed == 0: | |
| return 0 | |
| elif succeeded > 0: | |
| return 1 # Partial success | |
| else: | |
| return 2 # Complete failure | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment