Skip to content

Instantly share code, notes, and snippets.

@sandeepkunkunuru
Last active October 25, 2025 18:14
Show Gist options
  • Save sandeepkunkunuru/b879f8a5c00a66f29d63b47fda0b0c3e to your computer and use it in GitHub Desktop.
Save sandeepkunkunuru/b879f8a5c00a66f29d63b47fda0b0c3e to your computer and use it in GitHub Desktop.
Automated PR Review System using GitHub CLI + Claude AI - Smart, fast PR reviews with state tracking and auto-merge
#!/usr/bin/env python3
"""
Optimized PR Review Automation
Fast, cheap, and smart PR review automation using state tracking
This script automatically reviews PRs where you're requested as a reviewer using:
- GitHub CLI (gh) for PR management
- Claude CLI for AI-powered code reviews
- SQLite for state tracking and avoiding duplicate reviews
- Smart staleness detection to skip old/inactive PRs
"""
import json
import subprocess
import sqlite3
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import time
import os
# Configuration - UPDATE THESE FOR YOUR SETUP
BASE_DIR = Path.home() / "pr-automation" # Where to store repos and database
DB_PATH = BASE_DIR / "pr-reviews.db"
LOG_PATH = BASE_DIR / "pr-review.log"
ERROR_LOG_PATH = BASE_DIR / "pr-review-error.log"
# Configure your organizations and where to clone their repos
# Example: "YourOrgName": BASE_DIR / "YourOrgName"
ORGS = {
"your-org": BASE_DIR / "your-org",
"another-org": BASE_DIR / "another-org"
}
# Staleness thresholds
MAX_PR_AGE_DAYS = 60
MAX_INACTIVITY_DAYS = 21
# Colors for output
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
CYAN = '\033[0;36m'
NC = '\033[0m'
def log(level: str, message: str, color: str = Colors.NC):
"""Log message to console and file"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted = f"{color}[{level}]{Colors.NC} {message}"
plain = f"[{level}] {message}"
print(formatted)
# Append to log file
with open(LOG_PATH, 'a') as f:
f.write(f"{timestamp} {plain}\n")
def log_error(message: str):
"""Log error to both logs"""
log("ERROR", message, Colors.RED)
with open(ERROR_LOG_PATH, 'a') as f:
f.write(f"{datetime.now().isoformat()} [ERROR] {message}\n")
def run_command(cmd: List[str], check: bool = True) -> Optional[str]:
"""Run shell command and return output"""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=check
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
if check:
log_error(f"Command failed: {' '.join(cmd)}\n{e.stderr}")
return None
except Exception as e:
log_error(f"Command error: {' '.join(cmd)}\n{str(e)}")
return None
def init_database():
"""Initialize or update database schema"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create repo_state table for tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS repo_state (
org TEXT NOT NULL,
repo TEXT NOT NULL,
last_checked_at TEXT,
last_pr_found_at TEXT,
total_prs_seen INTEGER DEFAULT 0,
total_prs_reviewed INTEGER DEFAULT 0,
active_score REAL DEFAULT 1.0,
PRIMARY KEY (org, repo)
)
""")
# Create reviewed_commits table for fast lookup
cursor.execute("""
CREATE TABLE IF NOT EXISTS reviewed_commits (
org TEXT NOT NULL,
repo TEXT NOT NULL,
pr_number INTEGER NOT NULL,
commit_sha TEXT NOT NULL,
reviewed_at TEXT NOT NULL,
decision TEXT,
PRIMARY KEY (org, repo, pr_number, commit_sha)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_reviewed_commits_sha
ON reviewed_commits(org, repo, commit_sha)
""")
# Create pr_reviews table for full review history
cursor.execute("""
CREATE TABLE IF NOT EXISTS pr_reviews (
id INTEGER PRIMARY KEY AUTOINCREMENT,
review_timestamp TEXT NOT NULL,
org TEXT NOT NULL,
repo TEXT NOT NULL,
pr_number INTEGER NOT NULL,
pr_title TEXT,
pr_author TEXT,
pr_url TEXT,
pr_created_at TEXT,
pr_updated_at TEXT,
commit_sha TEXT NOT NULL,
decision TEXT NOT NULL,
feedback TEXT,
merged INTEGER DEFAULT 0,
merge_timestamp TEXT,
run_duration_seconds INTEGER
)
""")
# Create automation_runs table for tracking script runs
cursor.execute("""
CREATE TABLE IF NOT EXISTS automation_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_timestamp TEXT NOT NULL,
total_prs_found INTEGER,
prs_reviewed INTEGER,
prs_approved INTEGER,
prs_changes_requested INTEGER,
prs_merged INTEGER,
prs_skipped INTEGER,
errors INTEGER,
run_duration_seconds INTEGER
)
""")
conn.commit()
conn.close()
log("INFO", "Database initialized", Colors.GREEN)
def check_prerequisites() -> bool:
"""Check if all required tools are installed"""
log("INFO", "Checking prerequisites...", Colors.BLUE)
tools = {
"gh": "GitHub CLI (brew install gh)",
"git": "Git",
"claude": "Claude CLI (https://claude.com/download)"
}
for tool, install_msg in tools.items():
if not run_command(["command", "-v", tool], check=False):
log_error(f"{tool} not found. Install: {install_msg}")
return False
# Check GH auth
if run_command(["gh", "auth", "status"], check=False) is None:
log_error("Not authenticated with GitHub. Run: gh auth login")
return False
log("INFO", "All prerequisites met", Colors.GREEN)
return True
def get_current_user() -> str:
"""Get current GitHub username"""
output = run_command(["gh", "api", "user", "-q", ".login"])
return output if output else "unknown"
def is_pr_stale(created_at: str, updated_at: str) -> Tuple[bool, Optional[str]]:
"""Check if PR is stale"""
try:
created = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
updated = datetime.fromisoformat(updated_at.replace('Z', '+00:00'))
now = datetime.now(created.tzinfo)
age_days = (now - created).days
inactive_days = (now - updated).days
if age_days > MAX_PR_AGE_DAYS:
return True, f"PR is {age_days} days old (max: {MAX_PR_AGE_DAYS})"
if inactive_days > MAX_INACTIVITY_DAYS:
return True, f"Inactive for {inactive_days} days (max: {MAX_INACTIVITY_DAYS})"
return False, None
except Exception as e:
log_error(f"Error checking staleness: {e}")
return False, None
def already_reviewed(org: str, repo: str, pr_number: int, commit_sha: str) -> bool:
"""Check if we've already reviewed this specific commit"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT 1 FROM reviewed_commits
WHERE org = ? AND repo = ? AND pr_number = ? AND commit_sha = ?
""", (org, repo, pr_number, commit_sha))
exists = cursor.fetchone() is not None
conn.close()
return exists
def record_review(org: str, repo: str, pr_number: int, commit_sha: str,
decision: str, pr_data: Dict, feedback: str = None,
merged: bool = False, duration: int = 0):
"""Record a review in the database"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
now = datetime.now().isoformat()
# Record in reviewed_commits for fast lookup
cursor.execute("""
INSERT OR REPLACE INTO reviewed_commits
(org, repo, pr_number, commit_sha, reviewed_at, decision)
VALUES (?, ?, ?, ?, ?, ?)
""", (org, repo, pr_number, commit_sha, now, decision))
# Record full details in pr_reviews
author = pr_data.get('author', {}).get('login') if isinstance(pr_data.get('author'), dict) else pr_data.get('author')
cursor.execute("""
INSERT INTO pr_reviews
(review_timestamp, org, repo, pr_number, pr_title, pr_author, pr_url,
pr_created_at, pr_updated_at, commit_sha, decision, feedback, merged,
merge_timestamp, run_duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
now, org, repo, pr_number, pr_data.get('title'), author,
pr_data.get('url'), pr_data.get('createdAt'), pr_data.get('updatedAt'),
commit_sha, decision, feedback, 1 if merged else 0,
now if merged else None, duration
))
conn.commit()
conn.close()
def update_repo_state(org: str, repo: str, found_prs: int, reviewed_prs: int):
"""Update repository state tracking"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT INTO repo_state (org, repo, last_checked_at, last_pr_found_at,
total_prs_seen, total_prs_reviewed, active_score)
VALUES (?, ?, ?, ?, ?, ?, 1.0)
ON CONFLICT(org, repo) DO UPDATE SET
last_checked_at = ?,
last_pr_found_at = CASE WHEN ? > 0 THEN ? ELSE last_pr_found_at END,
total_prs_seen = total_prs_seen + ?,
total_prs_reviewed = total_prs_reviewed + ?,
active_score = CASE
WHEN ? > 0 THEN MIN(active_score * 1.1, 2.0)
ELSE MAX(active_score * 0.9, 0.1)
END
""", (org, repo, now, now if found_prs > 0 else None, found_prs, reviewed_prs,
now, found_prs, now, found_prs, reviewed_prs, found_prs))
conn.commit()
conn.close()
def fetch_all_review_prs() -> List[Dict]:
"""Fetch all PRs where current user is requested as reviewer using global search"""
log("INFO", "Fetching PRs using global search (fast method)...", Colors.CYAN)
# Use GitHub's search API via gh CLI - ONE API CALL instead of looping repos!
cmd = [
"gh", "search", "prs",
"--review-requested", "@me",
"--state", "open",
"--json", "number,title,repository,author,createdAt,updatedAt,url",
"--limit", "100"
]
output = run_command(cmd)
if not output:
log("WARNING", "No PRs found or search failed", Colors.YELLOW)
return []
try:
prs = json.loads(output)
log("INFO", f"Found {len(prs)} PRs across all orgs", Colors.GREEN)
return prs
except json.JSONDecodeError as e:
log_error(f"Failed to parse PR search results: {e}")
return []
def sync_repo(org: str, repo: str, repo_dir: Path) -> bool:
"""Clone or update repository"""
repo_path = repo_dir / repo
try:
if repo_path.exists():
log("INFO", f"Updating repo: {org}/{repo}", Colors.BLUE)
os.chdir(repo_path)
run_command(["git", "fetch", "--all", "--prune"])
else:
log("INFO", f"Cloning repo: {org}/{repo}", Colors.BLUE)
repo_dir.mkdir(parents=True, exist_ok=True)
os.chdir(repo_dir)
run_command(["gh", "repo", "clone", f"{org}/{repo}"])
os.chdir(repo)
return True
except Exception as e:
log_error(f"Failed to sync repo {org}/{repo}: {e}")
return False
def get_pr_diff(pr_number: int, base_branch: str) -> Optional[str]:
"""Get PR diff"""
try:
# Fetch base branch
run_command(["git", "fetch", "origin", base_branch], check=False)
# Get diff
diff = run_command(["git", "diff", f"origin/{base_branch}...HEAD"])
return diff
except Exception as e:
log_error(f"Failed to get diff for PR #{pr_number}: {e}")
return None
def review_with_claude(pr_data: Dict, diff: str, pr_info: Dict) -> Tuple[Optional[str], Optional[str]]:
"""Use Claude to review the PR"""
# Parse org/repo from nameWithOwner
repo_full = pr_data['repository']['nameWithOwner']
# Filter reviews to only show ones for the current commit being reviewed
# This prevents Claude from re-reporting issues that were fixed in newer commits
current_commit_sha = pr_info.get('headRefOid')
current_reviews = [r for r in pr_info.get('reviews', [])
if r.get('commit', {}).get('oid') == current_commit_sha]
# Create review context
context = f"""# PR Review Context
## Repository: {repo_full}
## PR #{pr_data['number']}: {pr_data['title']}
## Author: @{pr_data['author']['login']}
## URL: {pr_data['url']}
## Description
{pr_info.get('body', 'No description provided')}
## Reviews (for current commit {current_commit_sha[:8] if current_commit_sha else 'unknown'} only)
{json.dumps(current_reviews, indent=2)}
## Comments
{json.dumps(pr_info.get('comments', []), indent=2)}
## Diff Summary
{run_command(['git', 'diff', '--stat', f"origin/{pr_info.get('baseRefName', 'main')}...HEAD"]) or 'N/A'}
## Full Diff
```diff
{diff}
```
---
Please review this PR thoroughly:
1. Check code quality, style, and best practices
2. Look for potential bugs or security issues
3. Verify if tests are included and appropriate
4. Review the conversation history for any concerns
5. Provide a decision: APPROVE or REQUEST_CHANGES
6. If requesting changes, provide specific, actionable feedback
Format your response as:
DECISION: [APPROVE|REQUEST_CHANGES]
FEEDBACK: [Your detailed feedback here]
"""
# Write to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(context)
temp_path = f.name
try:
log("INFO", "Running Claude review...", Colors.CYAN)
result = run_command(["claude", "-p", context], check=False)
if not result:
return None, None
# Parse decision
decision_line = [line for line in result.split('\n') if 'DECISION:' in line.upper()]
decision = None
if decision_line:
decision = decision_line[0].split(':', 1)[1].strip().replace('*', '').upper()
# Parse feedback
feedback_parts = result.split('FEEDBACK:', 1)
feedback = feedback_parts[1].strip() if len(feedback_parts) > 1 else result
return decision, feedback
finally:
Path(temp_path).unlink(missing_ok=True)
def submit_review(org: str, repo: str, pr_number: int, decision: str, feedback: str) -> bool:
"""Submit review to GitHub"""
try:
if decision == "APPROVE":
run_command(["gh", "pr", "review", str(pr_number),
"--repo", f"{org}/{repo}",
"--approve", "--body", feedback])
return True
elif "REQUEST" in decision:
run_command(["gh", "pr", "review", str(pr_number),
"--repo", f"{org}/{repo}",
"--request-changes", "--body", feedback])
return True
return False
except Exception as e:
log_error(f"Failed to submit review: {e}")
return False
def merge_pr(org: str, repo: str, pr_number: int) -> bool:
"""Merge PR with squash"""
try:
log("INFO", f"Attempting to merge PR #{pr_number}...", Colors.CYAN)
result = run_command([
"gh", "pr", "merge", str(pr_number),
"--repo", f"{org}/{repo}",
"--squash", "--delete-branch"
], check=False)
if result is not None:
log("SUCCESS", f"PR #{pr_number} merged successfully!", Colors.GREEN)
return True
else:
log("WARNING", f"Failed to merge PR #{pr_number}", Colors.YELLOW)
return False
except Exception as e:
log_error(f"Error merging PR #{pr_number}: {e}")
return False
def review_pr(pr_data: Dict, org_dir: Path) -> Dict:
"""Review a single PR"""
# Use parsed org/repo from main()
org = pr_data.get('_parsed_org')
repo = pr_data.get('_parsed_repo')
# Fallback if not parsed yet
if not org or not repo:
repo_full = pr_data['repository']['nameWithOwner']
org, repo = repo_full.split('/', 1)
pr_number = pr_data['number']
start_time = time.time()
result = {
'org': org,
'repo': repo,
'pr_number': pr_number,
'reviewed': False,
'decision': None,
'merged': False
}
log("INFO", "=" * 60, Colors.BLUE)
log("INFO", f"Processing PR #{pr_number} in {org}/{repo}", Colors.BLUE)
log("INFO", f"Title: {pr_data['title']}", Colors.BLUE)
log("INFO", "=" * 60, Colors.BLUE)
# Check staleness first (before fetching more data)
is_stale, stale_reason = is_pr_stale(pr_data['createdAt'], pr_data['updatedAt'])
if is_stale:
log("WARNING", f"Skipping stale PR: {stale_reason}", Colors.YELLOW)
# Use a placeholder commit_sha for skipped stale PRs
record_review(org, repo, pr_number, "stale", "SKIP_STALE",
pr_data, stale_reason, duration=int(time.time() - start_time))
return result
# Sync repo
if not sync_repo(org, repo, org_dir):
log_error(f"Failed to sync repo {org}/{repo}")
record_review(org, repo, pr_number, "unknown", "ERROR",
pr_data, "Failed to sync repo", duration=int(time.time() - start_time))
return result
repo_path = org_dir / repo
os.chdir(repo_path)
# Stash and checkout PR
run_command(["git", "stash", "push", "-u", "-m", "Auto-stash"], check=False)
try:
run_command(["gh", "pr", "checkout", str(pr_number)])
except Exception as e:
log_error(f"Failed to checkout PR #{pr_number}: {e}")
# Note: commit_sha is not available yet at this point
record_review(org, repo, pr_number, "unknown", "ERROR",
pr_data, f"Failed to checkout: {e}", duration=int(time.time() - start_time))
return result
# Get PR details including commit SHA
pr_info_json = run_command([
"gh", "pr", "view", str(pr_number),
"--json", "title,body,author,baseRefName,reviews,comments,headRefOid"
])
if not pr_info_json:
log_error("Failed to get PR details")
return result
pr_info = json.loads(pr_info_json)
base_branch = pr_info.get('baseRefName', 'main')
commit_sha = pr_info.get('headRefOid', 'unknown')
# CRITICAL: Ensure we're on the latest commit of the PR
log("INFO", f"Ensuring we're on the latest commit: {commit_sha[:8]}...", Colors.CYAN)
try:
run_command(["git", "fetch", "origin", f"pull/{pr_number}/head"])
run_command(["git", "reset", "--hard", commit_sha])
# Verify we're on the correct commit
current_sha = run_command(["git", "rev-parse", "HEAD"])
if current_sha != commit_sha:
log_error(f"Failed to checkout correct commit. Expected {commit_sha[:8]}, got {current_sha[:8]}")
record_review(org, repo, pr_number, commit_sha, "ERROR",
pr_data, "Failed to checkout correct commit", duration=int(time.time() - start_time))
return result
log("SUCCESS", f"Verified on correct commit: {commit_sha[:8]}", Colors.GREEN)
except Exception as e:
log_error(f"Failed to ensure latest commit: {e}")
record_review(org, repo, pr_number, commit_sha, "ERROR",
pr_data, f"Failed to ensure latest commit: {e}", duration=int(time.time() - start_time))
return result
# Now check if we've already reviewed this specific commit
if already_reviewed(org, repo, pr_number, commit_sha):
log("INFO", f"Already reviewed commit {commit_sha[:8]} - skipping", Colors.YELLOW)
record_review(org, repo, pr_number, commit_sha, "SKIP_ALREADY_REVIEWED",
pr_data, "Already reviewed this commit", duration=int(time.time() - start_time))
return result
# Get diff
diff = get_pr_diff(pr_number, base_branch)
if not diff:
log_error("Failed to get PR diff")
record_review(org, repo, pr_number, commit_sha, "ERROR",
pr_data, "Failed to get diff", duration=int(time.time() - start_time))
return result
# Review with Claude
decision, feedback = review_with_claude(pr_data, diff, pr_info)
if not decision:
log_error("Failed to get decision from Claude")
record_review(org, repo, pr_number, commit_sha, "ERROR",
pr_data, "Claude review failed", duration=int(time.time() - start_time))
return result
log("INFO", f"Claude's decision: {decision}", Colors.CYAN)
result['decision'] = decision
result['reviewed'] = True
# Submit review
if not submit_review(org, repo, pr_number, decision, feedback):
log_error("Failed to submit review")
return result
# Merge if approved
merged = False
if decision == "APPROVE":
merged = merge_pr(org, repo, pr_number)
result['merged'] = merged
# Record in database
duration = int(time.time() - start_time)
record_review(org, repo, pr_number, commit_sha, decision, pr_data,
feedback, merged, duration)
log("SUCCESS", f"Completed review of PR #{pr_number} in {duration}s", Colors.GREEN)
# Cleanup
run_command(["git", "checkout", base_branch], check=False)
run_command(["git", "stash", "pop"], check=False)
return result
def main():
"""Main execution"""
log("INFO", "Starting Optimized PR Review Automation", Colors.CYAN)
log("INFO", "=" * 60, Colors.CYAN)
run_start = time.time()
# Initialize
if not check_prerequisites():
sys.exit(1)
# Ensure BASE_DIR exists
BASE_DIR.mkdir(parents=True, exist_ok=True)
init_database()
# Fetch all PRs using global search (FAST!)
all_prs = fetch_all_review_prs()
if not all_prs:
log("INFO", "No PRs to review", Colors.GREEN)
return
# Group by org for processing
prs_by_org = {}
for pr in all_prs:
# Parse org/repo from nameWithOwner field
repo_full = pr['repository']['nameWithOwner']
org, repo_name = repo_full.split('/', 1)
if org in ORGS:
if org not in prs_by_org:
prs_by_org[org] = []
# Add parsed org and repo to PR data
pr['_parsed_org'] = org
pr['_parsed_repo'] = repo_name
prs_by_org[org].append(pr)
# Stats
stats = {
'total_found': len(all_prs),
'reviewed': 0,
'approved': 0,
'changes_requested': 0,
'merged': 0,
'skipped': 0,
'errors': 0
}
# Process each org
for org, prs in prs_by_org.items():
log("INFO", f"\nProcessing {len(prs)} PRs for {org}", Colors.BLUE)
org_dir = ORGS[org]
org_dir.mkdir(parents=True, exist_ok=True)
# Track stats per repo (not per org)
repo_stats = {} # {repo_name: {'found': count, 'reviewed': count}}
for pr in prs:
repo_name = pr['_parsed_repo']
# Initialize repo stats if first time seeing this repo
if repo_name not in repo_stats:
repo_stats[repo_name] = {'found': 0, 'reviewed': 0}
repo_stats[repo_name]['found'] += 1
result = review_pr(pr, org_dir)
if result['reviewed']:
stats['reviewed'] += 1
repo_stats[repo_name]['reviewed'] += 1
if result['decision'] == 'APPROVE':
stats['approved'] += 1
elif 'REQUEST' in result['decision']:
stats['changes_requested'] += 1
if result['merged']:
stats['merged'] += 1
else:
if result.get('decision') and 'SKIP' in result['decision']:
stats['skipped'] += 1
else:
stats['errors'] += 1
# Update repo state for each unique repo
for repo_name, counts in repo_stats.items():
update_repo_state(org, repo_name, counts['found'], counts['reviewed'])
# Record automation run
duration = int(time.time() - run_start)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO automation_runs
(run_timestamp, total_prs_found, prs_reviewed, prs_approved,
prs_changes_requested, prs_merged, prs_skipped, errors, run_duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (datetime.now().isoformat(), stats['total_found'], stats['reviewed'],
stats['approved'], stats['changes_requested'], stats['merged'],
stats['skipped'], stats['errors'], duration))
conn.commit()
conn.close()
# Print summary
log("INFO", "\n" + "=" * 60, Colors.CYAN)
log("SUCCESS", "PR Review Automation Completed!", Colors.GREEN)
log("INFO", "=" * 60, Colors.CYAN)
log("INFO", f"Total PRs found: {stats['total_found']}", Colors.BLUE)
log("INFO", f"Reviewed: {stats['reviewed']}", Colors.GREEN)
log("INFO", f" - Approved: {stats['approved']}", Colors.GREEN)
log("INFO", f" - Changes requested: {stats['changes_requested']}", Colors.YELLOW)
log("INFO", f" - Merged: {stats['merged']}", Colors.GREEN)
log("INFO", f"Skipped: {stats['skipped']}", Colors.YELLOW)
log("INFO", f"Errors: {stats['errors']}", Colors.RED)
log("INFO", f"Duration: {duration}s", Colors.BLUE)
log("INFO", "=" * 60, Colors.CYAN)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
log("WARNING", "\nInterrupted by user", Colors.YELLOW)
sys.exit(130)
except Exception as e:
log_error(f"Fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment