Last active
October 25, 2025 18:14
-
-
Save sandeepkunkunuru/b879f8a5c00a66f29d63b47fda0b0c3e to your computer and use it in GitHub Desktop.
Automated PR Review System using GitHub CLI + Claude AI - Smart, fast PR reviews with state tracking and auto-merge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Optimized PR Review Automation | |
| Fast, cheap, and smart PR review automation using state tracking | |
| This script automatically reviews PRs where you're requested as a reviewer using: | |
| - GitHub CLI (gh) for PR management | |
| - Claude CLI for AI-powered code reviews | |
| - SQLite for state tracking and avoiding duplicate reviews | |
| - Smart staleness detection to skip old/inactive PRs | |
| """ | |
| import json | |
| import subprocess | |
| import sqlite3 | |
| import sys | |
| import tempfile | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import time | |
| import os | |
| # Configuration - UPDATE THESE FOR YOUR SETUP | |
| BASE_DIR = Path.home() / "pr-automation" # Where to store repos and database | |
| DB_PATH = BASE_DIR / "pr-reviews.db" | |
| LOG_PATH = BASE_DIR / "pr-review.log" | |
| ERROR_LOG_PATH = BASE_DIR / "pr-review-error.log" | |
| # Configure your organizations and where to clone their repos | |
| # Example: "YourOrgName": BASE_DIR / "YourOrgName" | |
| ORGS = { | |
| "your-org": BASE_DIR / "your-org", | |
| "another-org": BASE_DIR / "another-org" | |
| } | |
| # Staleness thresholds | |
| MAX_PR_AGE_DAYS = 60 | |
| MAX_INACTIVITY_DAYS = 21 | |
| # Colors for output | |
| class Colors: | |
| RED = '\033[0;31m' | |
| GREEN = '\033[0;32m' | |
| YELLOW = '\033[1;33m' | |
| BLUE = '\033[0;34m' | |
| CYAN = '\033[0;36m' | |
| NC = '\033[0m' | |
| def log(level: str, message: str, color: str = Colors.NC): | |
| """Log message to console and file""" | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| formatted = f"{color}[{level}]{Colors.NC} {message}" | |
| plain = f"[{level}] {message}" | |
| print(formatted) | |
| # Append to log file | |
| with open(LOG_PATH, 'a') as f: | |
| f.write(f"{timestamp} {plain}\n") | |
| def log_error(message: str): | |
| """Log error to both logs""" | |
| log("ERROR", message, Colors.RED) | |
| with open(ERROR_LOG_PATH, 'a') as f: | |
| f.write(f"{datetime.now().isoformat()} [ERROR] {message}\n") | |
| def run_command(cmd: List[str], check: bool = True) -> Optional[str]: | |
| """Run shell command and return output""" | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=check | |
| ) | |
| return result.stdout.strip() | |
| except subprocess.CalledProcessError as e: | |
| if check: | |
| log_error(f"Command failed: {' '.join(cmd)}\n{e.stderr}") | |
| return None | |
| except Exception as e: | |
| log_error(f"Command error: {' '.join(cmd)}\n{str(e)}") | |
| return None | |
| def init_database(): | |
| """Initialize or update database schema""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Create repo_state table for tracking | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS repo_state ( | |
| org TEXT NOT NULL, | |
| repo TEXT NOT NULL, | |
| last_checked_at TEXT, | |
| last_pr_found_at TEXT, | |
| total_prs_seen INTEGER DEFAULT 0, | |
| total_prs_reviewed INTEGER DEFAULT 0, | |
| active_score REAL DEFAULT 1.0, | |
| PRIMARY KEY (org, repo) | |
| ) | |
| """) | |
| # Create reviewed_commits table for fast lookup | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS reviewed_commits ( | |
| org TEXT NOT NULL, | |
| repo TEXT NOT NULL, | |
| pr_number INTEGER NOT NULL, | |
| commit_sha TEXT NOT NULL, | |
| reviewed_at TEXT NOT NULL, | |
| decision TEXT, | |
| PRIMARY KEY (org, repo, pr_number, commit_sha) | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_reviewed_commits_sha | |
| ON reviewed_commits(org, repo, commit_sha) | |
| """) | |
| # Create pr_reviews table for full review history | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS pr_reviews ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| review_timestamp TEXT NOT NULL, | |
| org TEXT NOT NULL, | |
| repo TEXT NOT NULL, | |
| pr_number INTEGER NOT NULL, | |
| pr_title TEXT, | |
| pr_author TEXT, | |
| pr_url TEXT, | |
| pr_created_at TEXT, | |
| pr_updated_at TEXT, | |
| commit_sha TEXT NOT NULL, | |
| decision TEXT NOT NULL, | |
| feedback TEXT, | |
| merged INTEGER DEFAULT 0, | |
| merge_timestamp TEXT, | |
| run_duration_seconds INTEGER | |
| ) | |
| """) | |
| # Create automation_runs table for tracking script runs | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS automation_runs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| run_timestamp TEXT NOT NULL, | |
| total_prs_found INTEGER, | |
| prs_reviewed INTEGER, | |
| prs_approved INTEGER, | |
| prs_changes_requested INTEGER, | |
| prs_merged INTEGER, | |
| prs_skipped INTEGER, | |
| errors INTEGER, | |
| run_duration_seconds INTEGER | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| log("INFO", "Database initialized", Colors.GREEN) | |
| def check_prerequisites() -> bool: | |
| """Check if all required tools are installed""" | |
| log("INFO", "Checking prerequisites...", Colors.BLUE) | |
| tools = { | |
| "gh": "GitHub CLI (brew install gh)", | |
| "git": "Git", | |
| "claude": "Claude CLI (https://claude.com/download)" | |
| } | |
| for tool, install_msg in tools.items(): | |
| if not run_command(["command", "-v", tool], check=False): | |
| log_error(f"{tool} not found. Install: {install_msg}") | |
| return False | |
| # Check GH auth | |
| if run_command(["gh", "auth", "status"], check=False) is None: | |
| log_error("Not authenticated with GitHub. Run: gh auth login") | |
| return False | |
| log("INFO", "All prerequisites met", Colors.GREEN) | |
| return True | |
| def get_current_user() -> str: | |
| """Get current GitHub username""" | |
| output = run_command(["gh", "api", "user", "-q", ".login"]) | |
| return output if output else "unknown" | |
| def is_pr_stale(created_at: str, updated_at: str) -> Tuple[bool, Optional[str]]: | |
| """Check if PR is stale""" | |
| try: | |
| created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) | |
| updated = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) | |
| now = datetime.now(created.tzinfo) | |
| age_days = (now - created).days | |
| inactive_days = (now - updated).days | |
| if age_days > MAX_PR_AGE_DAYS: | |
| return True, f"PR is {age_days} days old (max: {MAX_PR_AGE_DAYS})" | |
| if inactive_days > MAX_INACTIVITY_DAYS: | |
| return True, f"Inactive for {inactive_days} days (max: {MAX_INACTIVITY_DAYS})" | |
| return False, None | |
| except Exception as e: | |
| log_error(f"Error checking staleness: {e}") | |
| return False, None | |
| def already_reviewed(org: str, repo: str, pr_number: int, commit_sha: str) -> bool: | |
| """Check if we've already reviewed this specific commit""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT 1 FROM reviewed_commits | |
| WHERE org = ? AND repo = ? AND pr_number = ? AND commit_sha = ? | |
| """, (org, repo, pr_number, commit_sha)) | |
| exists = cursor.fetchone() is not None | |
| conn.close() | |
| return exists | |
| def record_review(org: str, repo: str, pr_number: int, commit_sha: str, | |
| decision: str, pr_data: Dict, feedback: str = None, | |
| merged: bool = False, duration: int = 0): | |
| """Record a review in the database""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| now = datetime.now().isoformat() | |
| # Record in reviewed_commits for fast lookup | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO reviewed_commits | |
| (org, repo, pr_number, commit_sha, reviewed_at, decision) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, (org, repo, pr_number, commit_sha, now, decision)) | |
| # Record full details in pr_reviews | |
| author = pr_data.get('author', {}).get('login') if isinstance(pr_data.get('author'), dict) else pr_data.get('author') | |
| cursor.execute(""" | |
| INSERT INTO pr_reviews | |
| (review_timestamp, org, repo, pr_number, pr_title, pr_author, pr_url, | |
| pr_created_at, pr_updated_at, commit_sha, decision, feedback, merged, | |
| merge_timestamp, run_duration_seconds) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| now, org, repo, pr_number, pr_data.get('title'), author, | |
| pr_data.get('url'), pr_data.get('createdAt'), pr_data.get('updatedAt'), | |
| commit_sha, decision, feedback, 1 if merged else 0, | |
| now if merged else None, duration | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def update_repo_state(org: str, repo: str, found_prs: int, reviewed_prs: int): | |
| """Update repository state tracking""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| now = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO repo_state (org, repo, last_checked_at, last_pr_found_at, | |
| total_prs_seen, total_prs_reviewed, active_score) | |
| VALUES (?, ?, ?, ?, ?, ?, 1.0) | |
| ON CONFLICT(org, repo) DO UPDATE SET | |
| last_checked_at = ?, | |
| last_pr_found_at = CASE WHEN ? > 0 THEN ? ELSE last_pr_found_at END, | |
| total_prs_seen = total_prs_seen + ?, | |
| total_prs_reviewed = total_prs_reviewed + ?, | |
| active_score = CASE | |
| WHEN ? > 0 THEN MIN(active_score * 1.1, 2.0) | |
| ELSE MAX(active_score * 0.9, 0.1) | |
| END | |
| """, (org, repo, now, now if found_prs > 0 else None, found_prs, reviewed_prs, | |
| now, found_prs, now, found_prs, reviewed_prs, found_prs)) | |
| conn.commit() | |
| conn.close() | |
| def fetch_all_review_prs() -> List[Dict]: | |
| """Fetch all PRs where current user is requested as reviewer using global search""" | |
| log("INFO", "Fetching PRs using global search (fast method)...", Colors.CYAN) | |
| # Use GitHub's search API via gh CLI - ONE API CALL instead of looping repos! | |
| cmd = [ | |
| "gh", "search", "prs", | |
| "--review-requested", "@me", | |
| "--state", "open", | |
| "--json", "number,title,repository,author,createdAt,updatedAt,url", | |
| "--limit", "100" | |
| ] | |
| output = run_command(cmd) | |
| if not output: | |
| log("WARNING", "No PRs found or search failed", Colors.YELLOW) | |
| return [] | |
| try: | |
| prs = json.loads(output) | |
| log("INFO", f"Found {len(prs)} PRs across all orgs", Colors.GREEN) | |
| return prs | |
| except json.JSONDecodeError as e: | |
| log_error(f"Failed to parse PR search results: {e}") | |
| return [] | |
| def sync_repo(org: str, repo: str, repo_dir: Path) -> bool: | |
| """Clone or update repository""" | |
| repo_path = repo_dir / repo | |
| try: | |
| if repo_path.exists(): | |
| log("INFO", f"Updating repo: {org}/{repo}", Colors.BLUE) | |
| os.chdir(repo_path) | |
| run_command(["git", "fetch", "--all", "--prune"]) | |
| else: | |
| log("INFO", f"Cloning repo: {org}/{repo}", Colors.BLUE) | |
| repo_dir.mkdir(parents=True, exist_ok=True) | |
| os.chdir(repo_dir) | |
| run_command(["gh", "repo", "clone", f"{org}/{repo}"]) | |
| os.chdir(repo) | |
| return True | |
| except Exception as e: | |
| log_error(f"Failed to sync repo {org}/{repo}: {e}") | |
| return False | |
| def get_pr_diff(pr_number: int, base_branch: str) -> Optional[str]: | |
| """Get PR diff""" | |
| try: | |
| # Fetch base branch | |
| run_command(["git", "fetch", "origin", base_branch], check=False) | |
| # Get diff | |
| diff = run_command(["git", "diff", f"origin/{base_branch}...HEAD"]) | |
| return diff | |
| except Exception as e: | |
| log_error(f"Failed to get diff for PR #{pr_number}: {e}") | |
| return None | |
| def review_with_claude(pr_data: Dict, diff: str, pr_info: Dict) -> Tuple[Optional[str], Optional[str]]: | |
| """Use Claude to review the PR""" | |
| # Parse org/repo from nameWithOwner | |
| repo_full = pr_data['repository']['nameWithOwner'] | |
| # Filter reviews to only show ones for the current commit being reviewed | |
| # This prevents Claude from re-reporting issues that were fixed in newer commits | |
| current_commit_sha = pr_info.get('headRefOid') | |
| current_reviews = [r for r in pr_info.get('reviews', []) | |
| if r.get('commit', {}).get('oid') == current_commit_sha] | |
| # Create review context | |
| context = f"""# PR Review Context | |
| ## Repository: {repo_full} | |
| ## PR #{pr_data['number']}: {pr_data['title']} | |
| ## Author: @{pr_data['author']['login']} | |
| ## URL: {pr_data['url']} | |
| ## Description | |
| {pr_info.get('body', 'No description provided')} | |
| ## Reviews (for current commit {current_commit_sha[:8] if current_commit_sha else 'unknown'} only) | |
| {json.dumps(current_reviews, indent=2)} | |
| ## Comments | |
| {json.dumps(pr_info.get('comments', []), indent=2)} | |
| ## Diff Summary | |
| {run_command(['git', 'diff', '--stat', f"origin/{pr_info.get('baseRefName', 'main')}...HEAD"]) or 'N/A'} | |
| ## Full Diff | |
| ```diff | |
| {diff} | |
| ``` | |
| --- | |
| Please review this PR thoroughly: | |
| 1. Check code quality, style, and best practices | |
| 2. Look for potential bugs or security issues | |
| 3. Verify if tests are included and appropriate | |
| 4. Review the conversation history for any concerns | |
| 5. Provide a decision: APPROVE or REQUEST_CHANGES | |
| 6. If requesting changes, provide specific, actionable feedback | |
| Format your response as: | |
| DECISION: [APPROVE|REQUEST_CHANGES] | |
| FEEDBACK: [Your detailed feedback here] | |
| """ | |
| # Write to temp file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: | |
| f.write(context) | |
| temp_path = f.name | |
| try: | |
| log("INFO", "Running Claude review...", Colors.CYAN) | |
| result = run_command(["claude", "-p", context], check=False) | |
| if not result: | |
| return None, None | |
| # Parse decision | |
| decision_line = [line for line in result.split('\n') if 'DECISION:' in line.upper()] | |
| decision = None | |
| if decision_line: | |
| decision = decision_line[0].split(':', 1)[1].strip().replace('*', '').upper() | |
| # Parse feedback | |
| feedback_parts = result.split('FEEDBACK:', 1) | |
| feedback = feedback_parts[1].strip() if len(feedback_parts) > 1 else result | |
| return decision, feedback | |
| finally: | |
| Path(temp_path).unlink(missing_ok=True) | |
| def submit_review(org: str, repo: str, pr_number: int, decision: str, feedback: str) -> bool: | |
| """Submit review to GitHub""" | |
| try: | |
| if decision == "APPROVE": | |
| run_command(["gh", "pr", "review", str(pr_number), | |
| "--repo", f"{org}/{repo}", | |
| "--approve", "--body", feedback]) | |
| return True | |
| elif "REQUEST" in decision: | |
| run_command(["gh", "pr", "review", str(pr_number), | |
| "--repo", f"{org}/{repo}", | |
| "--request-changes", "--body", feedback]) | |
| return True | |
| return False | |
| except Exception as e: | |
| log_error(f"Failed to submit review: {e}") | |
| return False | |
| def merge_pr(org: str, repo: str, pr_number: int) -> bool: | |
| """Merge PR with squash""" | |
| try: | |
| log("INFO", f"Attempting to merge PR #{pr_number}...", Colors.CYAN) | |
| result = run_command([ | |
| "gh", "pr", "merge", str(pr_number), | |
| "--repo", f"{org}/{repo}", | |
| "--squash", "--delete-branch" | |
| ], check=False) | |
| if result is not None: | |
| log("SUCCESS", f"PR #{pr_number} merged successfully!", Colors.GREEN) | |
| return True | |
| else: | |
| log("WARNING", f"Failed to merge PR #{pr_number}", Colors.YELLOW) | |
| return False | |
| except Exception as e: | |
| log_error(f"Error merging PR #{pr_number}: {e}") | |
| return False | |
| def review_pr(pr_data: Dict, org_dir: Path) -> Dict: | |
| """Review a single PR""" | |
| # Use parsed org/repo from main() | |
| org = pr_data.get('_parsed_org') | |
| repo = pr_data.get('_parsed_repo') | |
| # Fallback if not parsed yet | |
| if not org or not repo: | |
| repo_full = pr_data['repository']['nameWithOwner'] | |
| org, repo = repo_full.split('/', 1) | |
| pr_number = pr_data['number'] | |
| start_time = time.time() | |
| result = { | |
| 'org': org, | |
| 'repo': repo, | |
| 'pr_number': pr_number, | |
| 'reviewed': False, | |
| 'decision': None, | |
| 'merged': False | |
| } | |
| log("INFO", "=" * 60, Colors.BLUE) | |
| log("INFO", f"Processing PR #{pr_number} in {org}/{repo}", Colors.BLUE) | |
| log("INFO", f"Title: {pr_data['title']}", Colors.BLUE) | |
| log("INFO", "=" * 60, Colors.BLUE) | |
| # Check staleness first (before fetching more data) | |
| is_stale, stale_reason = is_pr_stale(pr_data['createdAt'], pr_data['updatedAt']) | |
| if is_stale: | |
| log("WARNING", f"Skipping stale PR: {stale_reason}", Colors.YELLOW) | |
| # Use a placeholder commit_sha for skipped stale PRs | |
| record_review(org, repo, pr_number, "stale", "SKIP_STALE", | |
| pr_data, stale_reason, duration=int(time.time() - start_time)) | |
| return result | |
| # Sync repo | |
| if not sync_repo(org, repo, org_dir): | |
| log_error(f"Failed to sync repo {org}/{repo}") | |
| record_review(org, repo, pr_number, "unknown", "ERROR", | |
| pr_data, "Failed to sync repo", duration=int(time.time() - start_time)) | |
| return result | |
| repo_path = org_dir / repo | |
| os.chdir(repo_path) | |
| # Stash and checkout PR | |
| run_command(["git", "stash", "push", "-u", "-m", "Auto-stash"], check=False) | |
| try: | |
| run_command(["gh", "pr", "checkout", str(pr_number)]) | |
| except Exception as e: | |
| log_error(f"Failed to checkout PR #{pr_number}: {e}") | |
| # Note: commit_sha is not available yet at this point | |
| record_review(org, repo, pr_number, "unknown", "ERROR", | |
| pr_data, f"Failed to checkout: {e}", duration=int(time.time() - start_time)) | |
| return result | |
| # Get PR details including commit SHA | |
| pr_info_json = run_command([ | |
| "gh", "pr", "view", str(pr_number), | |
| "--json", "title,body,author,baseRefName,reviews,comments,headRefOid" | |
| ]) | |
| if not pr_info_json: | |
| log_error("Failed to get PR details") | |
| return result | |
| pr_info = json.loads(pr_info_json) | |
| base_branch = pr_info.get('baseRefName', 'main') | |
| commit_sha = pr_info.get('headRefOid', 'unknown') | |
| # CRITICAL: Ensure we're on the latest commit of the PR | |
| log("INFO", f"Ensuring we're on the latest commit: {commit_sha[:8]}...", Colors.CYAN) | |
| try: | |
| run_command(["git", "fetch", "origin", f"pull/{pr_number}/head"]) | |
| run_command(["git", "reset", "--hard", commit_sha]) | |
| # Verify we're on the correct commit | |
| current_sha = run_command(["git", "rev-parse", "HEAD"]) | |
| if current_sha != commit_sha: | |
| log_error(f"Failed to checkout correct commit. Expected {commit_sha[:8]}, got {current_sha[:8]}") | |
| record_review(org, repo, pr_number, commit_sha, "ERROR", | |
| pr_data, "Failed to checkout correct commit", duration=int(time.time() - start_time)) | |
| return result | |
| log("SUCCESS", f"Verified on correct commit: {commit_sha[:8]}", Colors.GREEN) | |
| except Exception as e: | |
| log_error(f"Failed to ensure latest commit: {e}") | |
| record_review(org, repo, pr_number, commit_sha, "ERROR", | |
| pr_data, f"Failed to ensure latest commit: {e}", duration=int(time.time() - start_time)) | |
| return result | |
| # Now check if we've already reviewed this specific commit | |
| if already_reviewed(org, repo, pr_number, commit_sha): | |
| log("INFO", f"Already reviewed commit {commit_sha[:8]} - skipping", Colors.YELLOW) | |
| record_review(org, repo, pr_number, commit_sha, "SKIP_ALREADY_REVIEWED", | |
| pr_data, "Already reviewed this commit", duration=int(time.time() - start_time)) | |
| return result | |
| # Get diff | |
| diff = get_pr_diff(pr_number, base_branch) | |
| if not diff: | |
| log_error("Failed to get PR diff") | |
| record_review(org, repo, pr_number, commit_sha, "ERROR", | |
| pr_data, "Failed to get diff", duration=int(time.time() - start_time)) | |
| return result | |
| # Review with Claude | |
| decision, feedback = review_with_claude(pr_data, diff, pr_info) | |
| if not decision: | |
| log_error("Failed to get decision from Claude") | |
| record_review(org, repo, pr_number, commit_sha, "ERROR", | |
| pr_data, "Claude review failed", duration=int(time.time() - start_time)) | |
| return result | |
| log("INFO", f"Claude's decision: {decision}", Colors.CYAN) | |
| result['decision'] = decision | |
| result['reviewed'] = True | |
| # Submit review | |
| if not submit_review(org, repo, pr_number, decision, feedback): | |
| log_error("Failed to submit review") | |
| return result | |
| # Merge if approved | |
| merged = False | |
| if decision == "APPROVE": | |
| merged = merge_pr(org, repo, pr_number) | |
| result['merged'] = merged | |
| # Record in database | |
| duration = int(time.time() - start_time) | |
| record_review(org, repo, pr_number, commit_sha, decision, pr_data, | |
| feedback, merged, duration) | |
| log("SUCCESS", f"Completed review of PR #{pr_number} in {duration}s", Colors.GREEN) | |
| # Cleanup | |
| run_command(["git", "checkout", base_branch], check=False) | |
| run_command(["git", "stash", "pop"], check=False) | |
| return result | |
| def main(): | |
| """Main execution""" | |
| log("INFO", "Starting Optimized PR Review Automation", Colors.CYAN) | |
| log("INFO", "=" * 60, Colors.CYAN) | |
| run_start = time.time() | |
| # Initialize | |
| if not check_prerequisites(): | |
| sys.exit(1) | |
| # Ensure BASE_DIR exists | |
| BASE_DIR.mkdir(parents=True, exist_ok=True) | |
| init_database() | |
| # Fetch all PRs using global search (FAST!) | |
| all_prs = fetch_all_review_prs() | |
| if not all_prs: | |
| log("INFO", "No PRs to review", Colors.GREEN) | |
| return | |
| # Group by org for processing | |
| prs_by_org = {} | |
| for pr in all_prs: | |
| # Parse org/repo from nameWithOwner field | |
| repo_full = pr['repository']['nameWithOwner'] | |
| org, repo_name = repo_full.split('/', 1) | |
| if org in ORGS: | |
| if org not in prs_by_org: | |
| prs_by_org[org] = [] | |
| # Add parsed org and repo to PR data | |
| pr['_parsed_org'] = org | |
| pr['_parsed_repo'] = repo_name | |
| prs_by_org[org].append(pr) | |
| # Stats | |
| stats = { | |
| 'total_found': len(all_prs), | |
| 'reviewed': 0, | |
| 'approved': 0, | |
| 'changes_requested': 0, | |
| 'merged': 0, | |
| 'skipped': 0, | |
| 'errors': 0 | |
| } | |
| # Process each org | |
| for org, prs in prs_by_org.items(): | |
| log("INFO", f"\nProcessing {len(prs)} PRs for {org}", Colors.BLUE) | |
| org_dir = ORGS[org] | |
| org_dir.mkdir(parents=True, exist_ok=True) | |
| # Track stats per repo (not per org) | |
| repo_stats = {} # {repo_name: {'found': count, 'reviewed': count}} | |
| for pr in prs: | |
| repo_name = pr['_parsed_repo'] | |
| # Initialize repo stats if first time seeing this repo | |
| if repo_name not in repo_stats: | |
| repo_stats[repo_name] = {'found': 0, 'reviewed': 0} | |
| repo_stats[repo_name]['found'] += 1 | |
| result = review_pr(pr, org_dir) | |
| if result['reviewed']: | |
| stats['reviewed'] += 1 | |
| repo_stats[repo_name]['reviewed'] += 1 | |
| if result['decision'] == 'APPROVE': | |
| stats['approved'] += 1 | |
| elif 'REQUEST' in result['decision']: | |
| stats['changes_requested'] += 1 | |
| if result['merged']: | |
| stats['merged'] += 1 | |
| else: | |
| if result.get('decision') and 'SKIP' in result['decision']: | |
| stats['skipped'] += 1 | |
| else: | |
| stats['errors'] += 1 | |
| # Update repo state for each unique repo | |
| for repo_name, counts in repo_stats.items(): | |
| update_repo_state(org, repo_name, counts['found'], counts['reviewed']) | |
| # Record automation run | |
| duration = int(time.time() - run_start) | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO automation_runs | |
| (run_timestamp, total_prs_found, prs_reviewed, prs_approved, | |
| prs_changes_requested, prs_merged, prs_skipped, errors, run_duration_seconds) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (datetime.now().isoformat(), stats['total_found'], stats['reviewed'], | |
| stats['approved'], stats['changes_requested'], stats['merged'], | |
| stats['skipped'], stats['errors'], duration)) | |
| conn.commit() | |
| conn.close() | |
| # Print summary | |
| log("INFO", "\n" + "=" * 60, Colors.CYAN) | |
| log("SUCCESS", "PR Review Automation Completed!", Colors.GREEN) | |
| log("INFO", "=" * 60, Colors.CYAN) | |
| log("INFO", f"Total PRs found: {stats['total_found']}", Colors.BLUE) | |
| log("INFO", f"Reviewed: {stats['reviewed']}", Colors.GREEN) | |
| log("INFO", f" - Approved: {stats['approved']}", Colors.GREEN) | |
| log("INFO", f" - Changes requested: {stats['changes_requested']}", Colors.YELLOW) | |
| log("INFO", f" - Merged: {stats['merged']}", Colors.GREEN) | |
| log("INFO", f"Skipped: {stats['skipped']}", Colors.YELLOW) | |
| log("INFO", f"Errors: {stats['errors']}", Colors.RED) | |
| log("INFO", f"Duration: {duration}s", Colors.BLUE) | |
| log("INFO", "=" * 60, Colors.CYAN) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| log("WARNING", "\nInterrupted by user", Colors.YELLOW) | |
| sys.exit(130) | |
| except Exception as e: | |
| log_error(f"Fatal error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment