Last active
July 23, 2025 18:18
-
-
Save grahama1970/95f65a04812f5db9bd5cfb84a4271d4b to your computer and use it in GitHub Desktop.
code review to kimi using litellm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Generate a concatenated code review bundle from a configuration file. | |
| This script creates a single markdown document containing all files that need review. | |
| It is designed to be a robust command-line utility for both human and agent use. | |
| Additionally, it can perform AI-powered code reviews using LiteLLM with various models. | |
| AGENT VERIFICATION INSTRUCTIONS: | |
| - Run this script directly to execute working_usage() | |
| - The working_usage() function MUST pass all assertions | |
| - This verifies the script produces expected results | |
| - DO NOT assume the script works without running it | |
| Third-party Documentation: | |
| - [LiteLLM]: https://docs.litellm.ai/ | |
| - [Moonshot AI]: https://platform.moonshot.ai/docs | |
| Example Input: | |
| { | |
| "code_review_prompt_file": "prompts/code_review_prompt.md", | |
| "context_files": [ | |
| {"path": "README.md", "rationale": "Project overview and setup."} | |
| ], | |
| "files_to_review": [ | |
| {"path": "src/module.py", "rationale": "Core business logic"}, | |
| {"path": "tests/test_module.py", "rationale": "Test coverage"} | |
| ] | |
| } | |
| Expected Output: | |
| { | |
| "status": "success", | |
| "bundle_path": "/tmp/responses/code_review_bundle_20250123_120000.md", | |
| "files_processed": 3, | |
| "ai_review": { | |
| "model": "moonshot/kimi-k2-0711-preview", | |
| "status": "completed", | |
| "review_path": "/tmp/responses/ai_code_review_20250123_120000.md" | |
| } | |
| } | |
| """ | |
| import asyncio | |
| import json | |
| import sys | |
| import subprocess | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from enum import Enum | |
| import pyperclip | |
| # Third-party imports | |
| from loguru import logger | |
| import litellm | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| import typer | |
| from rich.console import Console | |
| from rich.table import Table | |
| # Configure logging and console | |
| logger.remove() # Remove default handler | |
| logger.add( | |
| sys.stderr, | |
| level="INFO", | |
| format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>" | |
| ) | |
| # Initialize Rich console | |
| console = Console(stderr=True) # Use stderr for messages to not interfere with stdout bundle output | |
| # Create Typer app | |
| app = typer.Typer( | |
| name="code-review-bundle", | |
| help="Generate code review bundles with optional AI-powered reviews", | |
| add_completion=False, | |
| ) | |
| # NEW: More robust project root discovery | |
| def _find_project_root(start_path: Path) -> Path: | |
| """Find the project root by searching for a .git directory or a .project_root file.""" | |
| current_path = start_path.resolve() | |
| while current_path != current_path.parent: | |
| if (current_path / ".git").is_dir() or (current_path / ".project_root").is_file(): | |
| return current_path | |
| current_path = current_path.parent | |
| return start_path.resolve() # Fallback to starting directory if no marker found | |
| # CRITICAL: Environment and Path Setup | |
| from dotenv import load_dotenv | |
| project_root = _find_project_root(Path(__file__).parent) | |
| dotenv_path = project_root / ".env" | |
| load_dotenv(dotenv_path=dotenv_path) | |
| # Optional: Add file logging with rotation | |
| log_dir = project_root / "logs" | |
| log_dir.mkdir(exist_ok=True) | |
| # MODIFIED: Add PID to log filename to prevent race conditions | |
| log_filename = f"{Path(__file__).stem}_{os.getpid()}_{{time}}.log" | |
| logger.add( | |
| log_dir / log_filename, | |
| rotation="10 MB", | |
| retention=5, | |
| level="DEBUG" | |
| ) | |
| # Logger Agent Integration (HIGHLY RECOMMENDED) | |
| try: | |
| sys.path.insert(0, str(project_root / "src" / "logger_agent" / "src")) # Use project_root | |
| from agent_log_manager import get_log_manager | |
| LOGGER_AGENT_AVAILABLE = True | |
| logger.info("β Logger agent available for knowledge building") | |
| except ImportError: | |
| LOGGER_AGENT_AVAILABLE = False | |
| logger.debug("Logger agent not available - running in standalone mode") | |
| # ============================================ | |
| # ENUMS AND TYPES | |
| # ============================================ | |
| class ExecutionMode(str, Enum): | |
| """Execution modes for the script.""" | |
| NORMAL = "normal" | |
| DEBUG = "debug" | |
| STRESS = "stress" | |
| # ============================================ | |
| # CORE FUNCTIONS (Outside __main__ block) | |
| # ============================================ | |
| def _read_file_safely(path: Path) -> str: | |
| """Reads a file, trying several common encodings to prevent UnicodeDecodeError.""" | |
| encodings_to_try = ('utf-8', 'utf-8-sig', 'iso-8859-1') | |
| for encoding in encodings_to_try: | |
| try: | |
| return path.read_text(encoding=encoding) | |
| except UnicodeDecodeError: | |
| continue | |
| # MODIFIED: More informative error message | |
| raise UnicodeDecodeError(f"Unable to decode '{path}' with any of the tried encodings: {', '.join(encodings_to_try)}.") | |
| def _get_git_info(project_root_dir: Path) -> str: | |
| """Retrieves current Git commit and status for provenance.""" | |
| # MODIFIED: Handle cases where git is not installed or it's not a repo | |
| try: | |
| head = subprocess.check_output( | |
| ['git', 'rev-parse', 'HEAD'], | |
| cwd=project_root_dir, | |
| stderr=subprocess.DEVNULL | |
| ).decode('utf-8').strip() | |
| status = subprocess.check_output( | |
| ['git', 'status', '--porcelain'], | |
| cwd=project_root_dir, | |
| stderr=subprocess.DEVNULL | |
| ).decode('utf-8').strip() | |
| info = f"**Git Commit:** `{head}`\n\n" | |
| if status: | |
| info += f"**Working Directory Status:**\n```\n{status}\n```\n" | |
| else: | |
| info += "**Working Directory Status:** `clean`\n" | |
| return info | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| return "**Git Info:** Not a git repository or `git` command not found.\n" | |
| def _get_language_from_path(path: Path) -> str: | |
| """Suggests a language for markdown code blocks based on file extension.""" | |
| ext_map = { | |
| ".py": "python", ".js": "javascript", ".ts": "typescript", ".json": "json", | |
| ".md": "markdown", ".yaml": "yaml", ".yml": "yaml", ".sh": "bash", | |
| ".html": "html", ".css": "css", ".sql": "sql", ".xml": "xml" | |
| } | |
| return ext_map.get(path.suffix.lower(), "text") | |
| def validate_config(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]: | |
| """Validate configuration structure and content.""" | |
| if "code_review_prompt_file" not in config and "code_review_prompt" not in config: | |
| return False, "Config must have either 'code_review_prompt_file' or 'code_review_prompt'" | |
| for key in ["files_to_review", "context_files"]: | |
| if key in config: | |
| if not isinstance(config[key], list): | |
| return False, f"'{key}' must be a list" | |
| for i, item in enumerate(config[key]): | |
| if not isinstance(item, dict): | |
| return False, f"Item {i} in '{key}' must be a dictionary" | |
| if "path" not in item: | |
| return False, f"Item {i} in '{key}' must have a 'path' field" | |
| if "rationale" not in item: | |
| return False, f"Item {i} in '{key}' must have a 'rationale' field" | |
| return True, None | |
| # NEW: Deduplicated helper for processing file entries | |
| def _process_file_entries( | |
| file_list: List[Dict[str, str]], | |
| section_title: str, | |
| rationale_prefix: str, | |
| project_root_dir: Path, | |
| output_stream | |
| ) -> Tuple[int, int]: | |
| """Processes a list of file entries and writes them to the output stream.""" | |
| processed, failed = 0, 0 | |
| output_stream.write(f"## {section_title}\n\n") | |
| for item in file_list: | |
| file_path = Path(item["path"]) | |
| rationale = item["rationale"] | |
| abs_path = project_root_dir / file_path | |
| output_stream.write(f"### File: `{file_path}`\n\n") | |
| output_stream.write(f"**{rationale_prefix}:** {rationale}\n\n") | |
| try: | |
| # MODIFIED: More specific exception handling | |
| content = _read_file_safely(abs_path) | |
| language = _get_language_from_path(file_path) | |
| output_stream.write(f"```{language}\n") | |
| output_stream.write(content) | |
| output_stream.write("\n```\n\n") | |
| processed += 1 | |
| except FileNotFoundError: | |
| output_stream.write(f"β οΈ **File not found**: `{abs_path}`\n\n") | |
| failed += 1 | |
| except (PermissionError, UnicodeDecodeError) as e: | |
| output_stream.write(f"β **Error reading file**: {type(e).__name__}: {e}\n\n") | |
| failed += 1 | |
| except Exception as e: | |
| logger.exception(f"Unexpected error reading file '{abs_path}'") | |
| output_stream.write(f"β **Unexpected error**: {e}\n\n") | |
| failed += 1 | |
| output_stream.write("---\n\n") | |
| return processed, failed | |
| def generate_review_bundle( | |
| files_to_review: List[Dict[str, str]], | |
| context_files: List[Dict[str, str]], | |
| code_review_prompt: str, | |
| project_root_dir: Path, | |
| output_stream, | |
| include_git_info: bool | |
| ) -> Dict[str, Any]: | |
| """Generates a concatenated markdown file with all code for review.""" | |
| total_processed, total_failed = 0, 0 | |
| output_stream.write("# Code Review Bundle\n\n") | |
| output_stream.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n") | |
| if include_git_info: | |
| output_stream.write("## Provenance\n\n") | |
| output_stream.write(_get_git_info(project_root_dir)) | |
| output_stream.write("\n---\n\n") | |
| output_stream.write("## Review Instructions\n\n") | |
| output_stream.write(code_review_prompt) | |
| output_stream.write("\n\n---\n\n") | |
| # MODIFIED: Use the new helper function | |
| if context_files: | |
| processed, failed = _process_file_entries( | |
| context_files, "Additional Context", "Rationale", project_root_dir, output_stream | |
| ) | |
| total_processed += processed | |
| total_failed += failed | |
| if files_to_review: | |
| processed, failed = _process_file_entries( | |
| files_to_review, "Files for Comprehensive Code Review", "Rationale for Review", project_root_dir, output_stream | |
| ) | |
| total_processed += processed | |
| total_failed += failed | |
| return { | |
| "files_processed": total_processed, | |
| "files_failed": total_failed, | |
| "total_files": len(files_to_review) + len(context_files), | |
| } | |
| @retry( | |
| stop=stop_after_attempt(3), | |
| wait=wait_exponential(multiplier=2, min=30, max=600), | |
| # MODIFIED: Added litellm.Timeout to retryable exceptions | |
| retry=retry_if_exception_type(( | |
| ConnectionError, | |
| TimeoutError, | |
| litellm.Timeout, | |
| litellm.APIConnectionError, | |
| litellm.ServiceUnavailableError, | |
| litellm.InternalServerError, | |
| litellm.RateLimitError | |
| )), | |
| before_sleep=lambda retry_state: logger.warning( | |
| f"Retrying AI code review (attempt {retry_state.attempt_number}/3) after error: " | |
| f"{retry_state.outcome.exception() if retry_state.outcome else 'Unknown error'}" | |
| ) | |
| ) | |
| async def perform_ai_code_review( | |
| bundle_content: str, | |
| model: str = "moonshot/kimi-k2-0711-preview", | |
| temperature: float = 0.7, | |
| max_tokens: int = 8192 | |
| ) -> Dict[str, Any]: | |
| """Perform AI-powered code review using LiteLLM with automatic retry logic.""" | |
| logger.info(f"Performing AI code review with model: {model}") | |
| # NEW: Add a simple token warning as a budget guardrail | |
| # A rough estimate: 1 token ~= 4 chars in English. Let's use 3 for safety. | |
| estimated_tokens = len(bundle_content) / 3 | |
| if estimated_tokens > 100_000: # Warn for very large prompts | |
| logger.warning(f"Large prompt detected: estimated ~{estimated_tokens:,.0f} tokens. This may incur significant cost.") | |
| try: | |
| messages = [ | |
| {"role": "system", "content": "You are an expert code reviewer. Analyze the provided code bundle and provide comprehensive feedback on code quality, potential bugs, security issues, performance concerns, and architectural improvements."}, | |
| {"role": "user", "content": f"Please review the following code bundle:\n\n{bundle_content}"} | |
| ] | |
| response = await litellm.acompletion(model=model, messages=messages, temperature=temperature, max_tokens=max_tokens) | |
| review_content = response.choices[0].message.content | |
| cost_info = {} | |
| try: | |
| # MODIFIED: Run synchronous cost calculation in a thread to not block the event loop | |
| import functools | |
| cost_func = functools.partial(litellm.completion_cost, completion_response=response) | |
| calculated_cost = await asyncio.to_thread(cost_func) | |
| cost_info["calculated_cost"] = calculated_cost | |
| except Exception as cost_error: | |
| logger.debug(f"Could not extract cost information: {cost_error}") | |
| cost_info["note"] = "Cost information not available for this model/provider" | |
| result = { | |
| "status": "success", "model": model, "review": review_content, | |
| "usage": {"prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens}, | |
| "cost": cost_info | |
| } | |
| if "calculated_cost" in cost_info: | |
| logger.info(f"AI review cost: ${cost_info['calculated_cost']:.4f}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"AI code review failed: {e}") | |
| return {"status": "failed", "model": model, "error": str(e)} | |
| def save_results(content: str, output_dir: Optional[Path] = None, filename_prefix: str = "result") -> Path: | |
| """Save content to a file with timestamp.""" | |
| if output_dir is None: | |
| output_dir = project_root / "tmp" / "responses" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{filename_prefix}_{timestamp}.md" | |
| output_path = output_dir / filename | |
| # MODIFIED: Ensure consistent line endings | |
| with open(output_path, 'w', encoding='utf-8', newline='\n') as f: | |
| f.write(content) | |
| logger.info(f"Content saved to: {output_path}") | |
| return output_path | |
| # ============================================ | |
| # USAGE FUNCTIONS (Inside __main__ block) | |
| # ============================================ | |
| async def working_usage(): | |
| """ | |
| Known working examples that demonstrate script functionality. | |
| This function MUST verify that the script produces expected results. | |
| """ | |
| logger.info("=== Running Working Usage Examples ===") | |
| # MODIFIED: Use tempfile to prevent race conditions in CI/parallel runs | |
| with tempfile.TemporaryDirectory() as temp_dir_str: | |
| temp_dir = Path(temp_dir_str) | |
| # Create test files and configs inside the temporary directory | |
| script_to_review = Path(__file__) | |
| readme_path = temp_dir / "README.md" | |
| readme_path.write_text("# Test Project\n\nThis is the main README file.", encoding='utf-8') | |
| prompt_path = temp_dir / "code_review_prompt.md" | |
| prompt_path.write_text("Please review this code for quality.", encoding='utf-8') | |
| test_config = { | |
| "code_review_prompt_file": str(prompt_path), | |
| "context_files": [{"path": str(readme_path), "rationale": "Project overview"}], | |
| "files_to_review": [{"path": str(script_to_review), "rationale": "The script itself"}] | |
| } | |
| config_path = temp_dir / "test_config.json" | |
| with open(config_path, 'w') as f: | |
| json.dump(test_config, f) | |
| try: | |
| code_review_prompt = _read_file_safely(prompt_path) | |
| from io import StringIO | |
| bundle_stream = StringIO() | |
| # The "project root" for this test is the real project root, not the temp dir, | |
| # so that it can find the script_to_review. | |
| result = generate_review_bundle( | |
| test_config["files_to_review"], | |
| test_config["context_files"], | |
| code_review_prompt, | |
| Path('/'), # Use root as base to allow absolute paths from temp dir | |
| bundle_stream, | |
| include_git_info=True | |
| ) | |
| bundle_content = bundle_stream.getvalue() | |
| bundle_path = save_results(bundle_content, filename_prefix="code_review_bundle") | |
| # Verify results | |
| assert result["files_failed"] == 0, f"Expected 0 failed files, but got {result['files_failed']}" | |
| assert result["files_processed"] == 2, f"Expected 2 processed files, got {result['files_processed']}" | |
| assert bundle_path.exists(), "Bundle file should exist" | |
| assert "## Additional Context" in bundle_content, "Context section should be in the bundle" | |
| assert f"### File: `{readme_path}`" in bundle_content, "README context file should be listed" | |
| logger.success(f"β Bundle generated successfully: {bundle_path}") | |
| # AI review test remains the same | |
| if os.getenv("MOONSHOT_API_KEY") or os.getenv("LITELLM_API_KEY"): | |
| # Use a smaller chunk for AI test to save cost/time | |
| small_bundle = "# Test\n```python\ndef x(): return 1```" | |
| ai_result = await perform_ai_code_review(small_bundle) | |
| assert ai_result["status"] == "success", f"AI review failed: {ai_result.get('error')}" | |
| logger.success("β AI review API call successful.") | |
| logger.success("β All working_usage tests passed!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Working usage failed: {e}") | |
| logger.exception("Full traceback:") | |
| return False | |
| @app.command(name="generate", help="Generate a code review bundle from a configuration file") | |
| def generate( | |
| config_file: Path = typer.Argument(..., help="Path to JSON configuration file", exists=True, file_okay=True, dir_okay=False, readable=True), | |
| output_file: Optional[Path] = typer.Option(None, "--output-file", "-o", help="Output file path. Defaults to stdout."), | |
| cli_project_root: Optional[Path] = typer.Option(None, "--project-root", help="Override auto-detected project root", exists=True, file_okay=False), | |
| include_git_info: bool = typer.Option(True, help="Include git commit and status information in the bundle"), | |
| quiet: bool = typer.Option(False, "--quiet", "-q", help="Suppress all informational messages"), | |
| ai_review: bool = typer.Option(False, "--ai-review", help="Perform AI-powered code review using LiteLLM"), | |
| model: str = typer.Option("moonshot/kimi-k2-0711-preview", "--model", help="LiteLLM model to use for AI review"), | |
| clipboard: bool = typer.Option(False, "--clipboard", help="Copy bundle to clipboard instead of performing AI review") | |
| ) -> None: | |
| """Generate code review bundle from configuration file.""" | |
| # Use override if provided, otherwise use auto-detected root | |
| current_project_root = cli_project_root.resolve() if cli_project_root else project_root | |
| if not quiet: | |
| console.print(f"[dim]Using project root: {current_project_root}[/dim]") | |
| try: | |
| with open(config_file, 'r', encoding='utf-8') as f: | |
| config = json.load(f) | |
| is_valid, error = validate_config(config) | |
| if not is_valid: | |
| console.print(f"[red]Error: Invalid configuration in '{config_file}' - {error}[/red]") | |
| raise typer.Exit(1) | |
| if 'code_review_prompt' in config: | |
| code_review_prompt = config['code_review_prompt'] | |
| else: | |
| prompt_file = current_project_root / config['code_review_prompt_file'] | |
| code_review_prompt = _read_file_safely(prompt_file) | |
| files_to_review = config.get('files_to_review', []) | |
| context_files = config.get('context_files', []) | |
| except (json.JSONDecodeError, KeyError, FileNotFoundError) as e: | |
| console.print(f"[red]Error loading or parsing config file '{config_file}': {e}[/red]") | |
| raise typer.Exit(1) | |
| if not quiet: | |
| console.print(f"[green]π Generating bundle from '{config_file}'...[/green]") | |
| from io import StringIO | |
| bundle_content: Optional[str] = None | |
| # Generate to a stream first to check for errors | |
| bundle_stream = StringIO() | |
| result = generate_review_bundle( | |
| files_to_review, context_files, code_review_prompt, | |
| current_project_root, bundle_stream, include_git_info | |
| ) | |
| # MODIFIED: Fail fast if any file was not processed successfully | |
| if result["files_failed"] > 0: | |
| console.print(f"[red]Error: Failed to process {result['files_failed']} file(s). Bundle generation aborted.[/red]") | |
| console.print("[dim]--- Bundle Generation Log ---[/dim]") | |
| print(bundle_stream.getvalue(), file=sys.stderr) # Print partial bundle to stderr for debugging | |
| console.print("[dim]-----------------------------[/dim]") | |
| raise typer.Exit(1) | |
| bundle_content = bundle_stream.getvalue() | |
| # Output the successful bundle | |
| if output_file is None: | |
| print(bundle_content, end='') | |
| else: | |
| # MODIFIED: Ensure consistent line endings | |
| output_file.write_text(bundle_content, encoding='utf-8', newline='\n') | |
| if not quiet: | |
| output_name = str(output_file) if output_file else "stdout" | |
| console.print(f"[green]β Bundle successfully generated to '{output_name}'.[/green]") | |
| console.print(f" Files processed: {result['files_processed']}/{result['total_files']}") | |
| if clipboard and not ai_review: | |
| pyperclip.copy(bundle_content) | |
| if not quiet: console.print(f"[green]π Bundle copied to clipboard ({len(bundle_content)} characters)[/green]") | |
| return | |
| if ai_review: | |
| if not quiet: console.print(f"[blue]π€ Performing AI code review with {model}...[/blue]") | |
| ai_result = asyncio.run(perform_ai_code_review(bundle_content, model=model)) | |
| if ai_result["status"] == "success": | |
| review_path = save_results(ai_result["review"], filename_prefix="ai_code_review") | |
| if not quiet: | |
| console.print(f"[green]β AI review saved to: {review_path}[/green]") | |
| if "usage" in ai_result: console.print(f" Tokens used: {ai_result['usage']['total_tokens']}") | |
| if "cost" in ai_result and "calculated_cost" in ai_result["cost"]: console.print(f" Cost: ${ai_result['cost']['calculated_cost']:.4f}") | |
| else: | |
| console.print(f"[red]β AI review failed: {ai_result.get('error', 'Unknown error')}[/red]") | |
| raise typer.Exit(1) | |
| # Keep other commands as stubs for now | |
| @app.command(name="debug", help="Run debug function for testing") | |
| def debug_command(): asyncio.run(working_usage()) | |
| @app.command(name="stress", help="Run stress tests from JSON files") | |
| def stress_command(): console.print("Stress test command not implemented in this version.") | |
| @app.command(name="working", help="Run working usage examples") | |
| def working_command(): | |
| success = asyncio.run(working_usage()) | |
| if not success: raise typer.Exit(1) | |
| if __name__ == "__main__": | |
| app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment