Skip to content

Instantly share code, notes, and snippets.

@grahama1970
Last active July 23, 2025 18:18
Show Gist options
  • Select an option

  • Save grahama1970/95f65a04812f5db9bd5cfb84a4271d4b to your computer and use it in GitHub Desktop.

Select an option

Save grahama1970/95f65a04812f5db9bd5cfb84a4271d4b to your computer and use it in GitHub Desktop.
code review to kimi using litellm
#!/usr/bin/env python3
"""
Generate a concatenated code review bundle from a configuration file.
This script creates a single markdown document containing all files that need review.
It is designed to be a robust command-line utility for both human and agent use.
Additionally, it can perform AI-powered code reviews using LiteLLM with various models.
AGENT VERIFICATION INSTRUCTIONS:
- Run this script directly to execute working_usage()
- The working_usage() function MUST pass all assertions
- This verifies the script produces expected results
- DO NOT assume the script works without running it
Third-party Documentation:
- [LiteLLM]: https://docs.litellm.ai/
- [Moonshot AI]: https://platform.moonshot.ai/docs
Example Input:
{
"code_review_prompt_file": "prompts/code_review_prompt.md",
"context_files": [
{"path": "README.md", "rationale": "Project overview and setup."}
],
"files_to_review": [
{"path": "src/module.py", "rationale": "Core business logic"},
{"path": "tests/test_module.py", "rationale": "Test coverage"}
]
}
Expected Output:
{
"status": "success",
"bundle_path": "/tmp/responses/code_review_bundle_20250123_120000.md",
"files_processed": 3,
"ai_review": {
"model": "moonshot/kimi-k2-0711-preview",
"status": "completed",
"review_path": "/tmp/responses/ai_code_review_20250123_120000.md"
}
}
"""
import asyncio
import json
import sys
import subprocess
import os
import tempfile
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from enum import Enum
import pyperclip
# Third-party imports
from loguru import logger
import litellm
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import typer
from rich.console import Console
from rich.table import Table
# Configure logging and console
logger.remove() # Remove default handler
logger.add(
sys.stderr,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
)
# Initialize Rich console
console = Console(stderr=True) # Use stderr for messages to not interfere with stdout bundle output
# Create Typer app
app = typer.Typer(
name="code-review-bundle",
help="Generate code review bundles with optional AI-powered reviews",
add_completion=False,
)
# NEW: More robust project root discovery
def _find_project_root(start_path: Path) -> Path:
"""Find the project root by searching for a .git directory or a .project_root file."""
current_path = start_path.resolve()
while current_path != current_path.parent:
if (current_path / ".git").is_dir() or (current_path / ".project_root").is_file():
return current_path
current_path = current_path.parent
return start_path.resolve() # Fallback to starting directory if no marker found
# CRITICAL: Environment and Path Setup
from dotenv import load_dotenv
project_root = _find_project_root(Path(__file__).parent)
dotenv_path = project_root / ".env"
load_dotenv(dotenv_path=dotenv_path)
# Optional: Add file logging with rotation
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
# MODIFIED: Add PID to log filename to prevent race conditions
log_filename = f"{Path(__file__).stem}_{os.getpid()}_{{time}}.log"
logger.add(
log_dir / log_filename,
rotation="10 MB",
retention=5,
level="DEBUG"
)
# Logger Agent Integration (HIGHLY RECOMMENDED)
try:
sys.path.insert(0, str(project_root / "src" / "logger_agent" / "src")) # Use project_root
from agent_log_manager import get_log_manager
LOGGER_AGENT_AVAILABLE = True
logger.info("βœ“ Logger agent available for knowledge building")
except ImportError:
LOGGER_AGENT_AVAILABLE = False
logger.debug("Logger agent not available - running in standalone mode")
# ============================================
# ENUMS AND TYPES
# ============================================
class ExecutionMode(str, Enum):
"""Execution modes for the script."""
NORMAL = "normal"
DEBUG = "debug"
STRESS = "stress"
# ============================================
# CORE FUNCTIONS (Outside __main__ block)
# ============================================
def _read_file_safely(path: Path) -> str:
"""Reads a file, trying several common encodings to prevent UnicodeDecodeError."""
encodings_to_try = ('utf-8', 'utf-8-sig', 'iso-8859-1')
for encoding in encodings_to_try:
try:
return path.read_text(encoding=encoding)
except UnicodeDecodeError:
continue
# MODIFIED: More informative error message
raise UnicodeDecodeError(f"Unable to decode '{path}' with any of the tried encodings: {', '.join(encodings_to_try)}.")
def _get_git_info(project_root_dir: Path) -> str:
"""Retrieves current Git commit and status for provenance."""
# MODIFIED: Handle cases where git is not installed or it's not a repo
try:
head = subprocess.check_output(
['git', 'rev-parse', 'HEAD'],
cwd=project_root_dir,
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
status = subprocess.check_output(
['git', 'status', '--porcelain'],
cwd=project_root_dir,
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
info = f"**Git Commit:** `{head}`\n\n"
if status:
info += f"**Working Directory Status:**\n```\n{status}\n```\n"
else:
info += "**Working Directory Status:** `clean`\n"
return info
except (FileNotFoundError, subprocess.CalledProcessError):
return "**Git Info:** Not a git repository or `git` command not found.\n"
def _get_language_from_path(path: Path) -> str:
"""Suggests a language for markdown code blocks based on file extension."""
ext_map = {
".py": "python", ".js": "javascript", ".ts": "typescript", ".json": "json",
".md": "markdown", ".yaml": "yaml", ".yml": "yaml", ".sh": "bash",
".html": "html", ".css": "css", ".sql": "sql", ".xml": "xml"
}
return ext_map.get(path.suffix.lower(), "text")
def validate_config(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Validate configuration structure and content."""
if "code_review_prompt_file" not in config and "code_review_prompt" not in config:
return False, "Config must have either 'code_review_prompt_file' or 'code_review_prompt'"
for key in ["files_to_review", "context_files"]:
if key in config:
if not isinstance(config[key], list):
return False, f"'{key}' must be a list"
for i, item in enumerate(config[key]):
if not isinstance(item, dict):
return False, f"Item {i} in '{key}' must be a dictionary"
if "path" not in item:
return False, f"Item {i} in '{key}' must have a 'path' field"
if "rationale" not in item:
return False, f"Item {i} in '{key}' must have a 'rationale' field"
return True, None
# NEW: Deduplicated helper for processing file entries
def _process_file_entries(
file_list: List[Dict[str, str]],
section_title: str,
rationale_prefix: str,
project_root_dir: Path,
output_stream
) -> Tuple[int, int]:
"""Processes a list of file entries and writes them to the output stream."""
processed, failed = 0, 0
output_stream.write(f"## {section_title}\n\n")
for item in file_list:
file_path = Path(item["path"])
rationale = item["rationale"]
abs_path = project_root_dir / file_path
output_stream.write(f"### File: `{file_path}`\n\n")
output_stream.write(f"**{rationale_prefix}:** {rationale}\n\n")
try:
# MODIFIED: More specific exception handling
content = _read_file_safely(abs_path)
language = _get_language_from_path(file_path)
output_stream.write(f"```{language}\n")
output_stream.write(content)
output_stream.write("\n```\n\n")
processed += 1
except FileNotFoundError:
output_stream.write(f"⚠️ **File not found**: `{abs_path}`\n\n")
failed += 1
except (PermissionError, UnicodeDecodeError) as e:
output_stream.write(f"❌ **Error reading file**: {type(e).__name__}: {e}\n\n")
failed += 1
except Exception as e:
logger.exception(f"Unexpected error reading file '{abs_path}'")
output_stream.write(f"❌ **Unexpected error**: {e}\n\n")
failed += 1
output_stream.write("---\n\n")
return processed, failed
def generate_review_bundle(
files_to_review: List[Dict[str, str]],
context_files: List[Dict[str, str]],
code_review_prompt: str,
project_root_dir: Path,
output_stream,
include_git_info: bool
) -> Dict[str, Any]:
"""Generates a concatenated markdown file with all code for review."""
total_processed, total_failed = 0, 0
output_stream.write("# Code Review Bundle\n\n")
output_stream.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n")
if include_git_info:
output_stream.write("## Provenance\n\n")
output_stream.write(_get_git_info(project_root_dir))
output_stream.write("\n---\n\n")
output_stream.write("## Review Instructions\n\n")
output_stream.write(code_review_prompt)
output_stream.write("\n\n---\n\n")
# MODIFIED: Use the new helper function
if context_files:
processed, failed = _process_file_entries(
context_files, "Additional Context", "Rationale", project_root_dir, output_stream
)
total_processed += processed
total_failed += failed
if files_to_review:
processed, failed = _process_file_entries(
files_to_review, "Files for Comprehensive Code Review", "Rationale for Review", project_root_dir, output_stream
)
total_processed += processed
total_failed += failed
return {
"files_processed": total_processed,
"files_failed": total_failed,
"total_files": len(files_to_review) + len(context_files),
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=30, max=600),
# MODIFIED: Added litellm.Timeout to retryable exceptions
retry=retry_if_exception_type((
ConnectionError,
TimeoutError,
litellm.Timeout,
litellm.APIConnectionError,
litellm.ServiceUnavailableError,
litellm.InternalServerError,
litellm.RateLimitError
)),
before_sleep=lambda retry_state: logger.warning(
f"Retrying AI code review (attempt {retry_state.attempt_number}/3) after error: "
f"{retry_state.outcome.exception() if retry_state.outcome else 'Unknown error'}"
)
)
async def perform_ai_code_review(
bundle_content: str,
model: str = "moonshot/kimi-k2-0711-preview",
temperature: float = 0.7,
max_tokens: int = 8192
) -> Dict[str, Any]:
"""Perform AI-powered code review using LiteLLM with automatic retry logic."""
logger.info(f"Performing AI code review with model: {model}")
# NEW: Add a simple token warning as a budget guardrail
# A rough estimate: 1 token ~= 4 chars in English. Let's use 3 for safety.
estimated_tokens = len(bundle_content) / 3
if estimated_tokens > 100_000: # Warn for very large prompts
logger.warning(f"Large prompt detected: estimated ~{estimated_tokens:,.0f} tokens. This may incur significant cost.")
try:
messages = [
{"role": "system", "content": "You are an expert code reviewer. Analyze the provided code bundle and provide comprehensive feedback on code quality, potential bugs, security issues, performance concerns, and architectural improvements."},
{"role": "user", "content": f"Please review the following code bundle:\n\n{bundle_content}"}
]
response = await litellm.acompletion(model=model, messages=messages, temperature=temperature, max_tokens=max_tokens)
review_content = response.choices[0].message.content
cost_info = {}
try:
# MODIFIED: Run synchronous cost calculation in a thread to not block the event loop
import functools
cost_func = functools.partial(litellm.completion_cost, completion_response=response)
calculated_cost = await asyncio.to_thread(cost_func)
cost_info["calculated_cost"] = calculated_cost
except Exception as cost_error:
logger.debug(f"Could not extract cost information: {cost_error}")
cost_info["note"] = "Cost information not available for this model/provider"
result = {
"status": "success", "model": model, "review": review_content,
"usage": {"prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens},
"cost": cost_info
}
if "calculated_cost" in cost_info:
logger.info(f"AI review cost: ${cost_info['calculated_cost']:.4f}")
return result
except Exception as e:
logger.error(f"AI code review failed: {e}")
return {"status": "failed", "model": model, "error": str(e)}
def save_results(content: str, output_dir: Optional[Path] = None, filename_prefix: str = "result") -> Path:
"""Save content to a file with timestamp."""
if output_dir is None:
output_dir = project_root / "tmp" / "responses"
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{filename_prefix}_{timestamp}.md"
output_path = output_dir / filename
# MODIFIED: Ensure consistent line endings
with open(output_path, 'w', encoding='utf-8', newline='\n') as f:
f.write(content)
logger.info(f"Content saved to: {output_path}")
return output_path
# ============================================
# USAGE FUNCTIONS (Inside __main__ block)
# ============================================
async def working_usage():
"""
Known working examples that demonstrate script functionality.
This function MUST verify that the script produces expected results.
"""
logger.info("=== Running Working Usage Examples ===")
# MODIFIED: Use tempfile to prevent race conditions in CI/parallel runs
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
# Create test files and configs inside the temporary directory
script_to_review = Path(__file__)
readme_path = temp_dir / "README.md"
readme_path.write_text("# Test Project\n\nThis is the main README file.", encoding='utf-8')
prompt_path = temp_dir / "code_review_prompt.md"
prompt_path.write_text("Please review this code for quality.", encoding='utf-8')
test_config = {
"code_review_prompt_file": str(prompt_path),
"context_files": [{"path": str(readme_path), "rationale": "Project overview"}],
"files_to_review": [{"path": str(script_to_review), "rationale": "The script itself"}]
}
config_path = temp_dir / "test_config.json"
with open(config_path, 'w') as f:
json.dump(test_config, f)
try:
code_review_prompt = _read_file_safely(prompt_path)
from io import StringIO
bundle_stream = StringIO()
# The "project root" for this test is the real project root, not the temp dir,
# so that it can find the script_to_review.
result = generate_review_bundle(
test_config["files_to_review"],
test_config["context_files"],
code_review_prompt,
Path('/'), # Use root as base to allow absolute paths from temp dir
bundle_stream,
include_git_info=True
)
bundle_content = bundle_stream.getvalue()
bundle_path = save_results(bundle_content, filename_prefix="code_review_bundle")
# Verify results
assert result["files_failed"] == 0, f"Expected 0 failed files, but got {result['files_failed']}"
assert result["files_processed"] == 2, f"Expected 2 processed files, got {result['files_processed']}"
assert bundle_path.exists(), "Bundle file should exist"
assert "## Additional Context" in bundle_content, "Context section should be in the bundle"
assert f"### File: `{readme_path}`" in bundle_content, "README context file should be listed"
logger.success(f"βœ“ Bundle generated successfully: {bundle_path}")
# AI review test remains the same
if os.getenv("MOONSHOT_API_KEY") or os.getenv("LITELLM_API_KEY"):
# Use a smaller chunk for AI test to save cost/time
small_bundle = "# Test\n```python\ndef x(): return 1```"
ai_result = await perform_ai_code_review(small_bundle)
assert ai_result["status"] == "success", f"AI review failed: {ai_result.get('error')}"
logger.success("βœ“ AI review API call successful.")
logger.success("βœ“ All working_usage tests passed!")
return True
except Exception as e:
logger.error(f"Working usage failed: {e}")
logger.exception("Full traceback:")
return False
@app.command(name="generate", help="Generate a code review bundle from a configuration file")
def generate(
config_file: Path = typer.Argument(..., help="Path to JSON configuration file", exists=True, file_okay=True, dir_okay=False, readable=True),
output_file: Optional[Path] = typer.Option(None, "--output-file", "-o", help="Output file path. Defaults to stdout."),
cli_project_root: Optional[Path] = typer.Option(None, "--project-root", help="Override auto-detected project root", exists=True, file_okay=False),
include_git_info: bool = typer.Option(True, help="Include git commit and status information in the bundle"),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Suppress all informational messages"),
ai_review: bool = typer.Option(False, "--ai-review", help="Perform AI-powered code review using LiteLLM"),
model: str = typer.Option("moonshot/kimi-k2-0711-preview", "--model", help="LiteLLM model to use for AI review"),
clipboard: bool = typer.Option(False, "--clipboard", help="Copy bundle to clipboard instead of performing AI review")
) -> None:
"""Generate code review bundle from configuration file."""
# Use override if provided, otherwise use auto-detected root
current_project_root = cli_project_root.resolve() if cli_project_root else project_root
if not quiet:
console.print(f"[dim]Using project root: {current_project_root}[/dim]")
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
is_valid, error = validate_config(config)
if not is_valid:
console.print(f"[red]Error: Invalid configuration in '{config_file}' - {error}[/red]")
raise typer.Exit(1)
if 'code_review_prompt' in config:
code_review_prompt = config['code_review_prompt']
else:
prompt_file = current_project_root / config['code_review_prompt_file']
code_review_prompt = _read_file_safely(prompt_file)
files_to_review = config.get('files_to_review', [])
context_files = config.get('context_files', [])
except (json.JSONDecodeError, KeyError, FileNotFoundError) as e:
console.print(f"[red]Error loading or parsing config file '{config_file}': {e}[/red]")
raise typer.Exit(1)
if not quiet:
console.print(f"[green]πŸš€ Generating bundle from '{config_file}'...[/green]")
from io import StringIO
bundle_content: Optional[str] = None
# Generate to a stream first to check for errors
bundle_stream = StringIO()
result = generate_review_bundle(
files_to_review, context_files, code_review_prompt,
current_project_root, bundle_stream, include_git_info
)
# MODIFIED: Fail fast if any file was not processed successfully
if result["files_failed"] > 0:
console.print(f"[red]Error: Failed to process {result['files_failed']} file(s). Bundle generation aborted.[/red]")
console.print("[dim]--- Bundle Generation Log ---[/dim]")
print(bundle_stream.getvalue(), file=sys.stderr) # Print partial bundle to stderr for debugging
console.print("[dim]-----------------------------[/dim]")
raise typer.Exit(1)
bundle_content = bundle_stream.getvalue()
# Output the successful bundle
if output_file is None:
print(bundle_content, end='')
else:
# MODIFIED: Ensure consistent line endings
output_file.write_text(bundle_content, encoding='utf-8', newline='\n')
if not quiet:
output_name = str(output_file) if output_file else "stdout"
console.print(f"[green]βœ… Bundle successfully generated to '{output_name}'.[/green]")
console.print(f" Files processed: {result['files_processed']}/{result['total_files']}")
if clipboard and not ai_review:
pyperclip.copy(bundle_content)
if not quiet: console.print(f"[green]πŸ“‹ Bundle copied to clipboard ({len(bundle_content)} characters)[/green]")
return
if ai_review:
if not quiet: console.print(f"[blue]πŸ€– Performing AI code review with {model}...[/blue]")
ai_result = asyncio.run(perform_ai_code_review(bundle_content, model=model))
if ai_result["status"] == "success":
review_path = save_results(ai_result["review"], filename_prefix="ai_code_review")
if not quiet:
console.print(f"[green]βœ… AI review saved to: {review_path}[/green]")
if "usage" in ai_result: console.print(f" Tokens used: {ai_result['usage']['total_tokens']}")
if "cost" in ai_result and "calculated_cost" in ai_result["cost"]: console.print(f" Cost: ${ai_result['cost']['calculated_cost']:.4f}")
else:
console.print(f"[red]❌ AI review failed: {ai_result.get('error', 'Unknown error')}[/red]")
raise typer.Exit(1)
# Keep other commands as stubs for now
@app.command(name="debug", help="Run debug function for testing")
def debug_command(): asyncio.run(working_usage())
@app.command(name="stress", help="Run stress tests from JSON files")
def stress_command(): console.print("Stress test command not implemented in this version.")
@app.command(name="working", help="Run working usage examples")
def working_command():
success = asyncio.run(working_usage())
if not success: raise typer.Exit(1)
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment