Last active
January 26, 2025 21:39
-
-
Save grahama1970/329aaa4796e366fdfe8d67e7025b133c to your computer and use it in GitHub Desktop.
Analyzes Python project files to generate a structured report of directory trees, dependencies, and imports. Helps LLMs understand project architecture and relationships between files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from dotenv import load_dotenv | |
def get_project_root(marker_file=".git"): | |
""" | |
Find the project root directory by looking for a marker file. | |
Args: | |
marker_file (str): File/directory to look for (default: ".git") | |
Returns: | |
Path: Project root directory path | |
Raises: | |
RuntimeError: If marker file not found in parent directories | |
""" | |
current_dir = Path(__file__).resolve().parent | |
while current_dir != current_dir.root: | |
if (current_dir / marker_file).exists(): | |
return current_dir | |
current_dir = current_dir.parent | |
raise RuntimeError(f"Could not find project root. Ensure {marker_file} exists.") | |
def load_env_file(env_type="backend"): | |
""" | |
Load environment variables from a .env file. | |
Args: | |
env_type (str): Type of environment to load (default: "backend") | |
Raises: | |
FileNotFoundError: If .env file not found in expected locations | |
""" | |
project_dir = get_project_root() | |
env_dirs = [project_dir, project_dir / "app/backend"] | |
for env_dir in env_dirs: | |
env_file = env_dir / f".env.{env_type}" | |
if env_file.exists(): | |
load_dotenv(env_file) | |
print(f"Loaded environment file: {env_file}") | |
return | |
raise FileNotFoundError(f"Environment file .env.{env_type} not found in any known locations.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import ast | |
import importlib | |
from pathlib import Path | |
from typing import List, Optional, Dict, Any | |
import pyperclip | |
import json | |
from pathspec import PathSpec | |
from loguru import logger | |
from collections import defaultdict | |
from importlib.util import find_spec | |
import networkx as nx | |
from nltk.tokenize import word_tokenize | |
import nltk | |
nltk.download('punkt_tab') | |
# Helper: VSCode Workspace Generation | |
def generate_vscode_workspace(project_dir: str, matched_files: List[str], config_name: str = "project") -> str: | |
""" | |
Generate a VSCode workspace file for the matched files. | |
""" | |
workspace_config = { | |
"folders": [{"path": os.path.relpath(project_dir)}], | |
"settings": {}, | |
"launch": {}, | |
"tasks": { | |
"version": "2.0.0", | |
"tasks": [ | |
{ | |
"label": "Run Script", | |
"type": "shell", | |
"command": "python", | |
"args": ["main.py"], | |
"group": "build", | |
} | |
], | |
}, | |
} | |
workspace_file = os.path.join(project_dir, f"{config_name}.code-workspace") | |
try: | |
# Check if file exists, and ask for confirmation before overwriting | |
if os.path.exists(workspace_file): | |
confirm = input(f"Workspace file {workspace_file} exists. Overwrite? (y/n): ").lower() | |
if confirm != "y": | |
logger.info("Workspace generation canceled.") | |
return "" | |
with open(workspace_file, 'w', encoding='utf-8') as f: | |
json.dump(workspace_config, f, indent=4) | |
logger.info(f"VSCode workspace generated at: {workspace_file}") | |
except Exception as e: | |
logger.error(f"Failed to generate VSCode workspace: {e}") | |
return workspace_file | |
def check_project_imports(file_path: str, project_dir: str) -> Dict[str, List[str]]: | |
""" | |
Check project-specific and external imports in a file. | |
- Detect missing project imports. | |
- Check for missing Python packages (third-party or standard library). | |
""" | |
project_dir_path = Path(project_dir).resolve() | |
found_imports = [] | |
missing_imports = [] | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
tree = ast.parse(f.read()) | |
for node in ast.walk(tree): | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
import_path = alias.name.replace('.', '/') + '.py' | |
full_path = project_dir_path / import_path | |
if full_path.exists(): | |
found_imports.append(alias.name) | |
elif importlib.util.find_spec(alias.name): # Check if package is installed | |
found_imports.append(alias.name) | |
else: | |
missing_imports.append(alias.name) | |
elif isinstance(node, ast.ImportFrom): | |
if node.module: | |
import_path = node.module.replace('.', '/') + '.py' | |
full_path = project_dir_path / import_path | |
if full_path.exists(): | |
found_imports.append(node.module) | |
elif importlib.util.find_spec(node.module): # Check if package is installed | |
found_imports.append(node.module) | |
else: | |
missing_imports.append(node.module) | |
except Exception as e: | |
logger.error(f"Error checking imports in {file_path}: {e}") | |
return { | |
"found": found_imports, | |
"missing": missing_imports | |
} | |
def analyze_dependencies(files: List[str], project_dir: str) -> Dict[str, Any]: | |
""" | |
Analyze dependencies between files in the project directory. | |
Uses networkx to build and analyze the dependency graph. | |
Args: | |
files: List of files to analyze | |
project_dir: Root directory of the project | |
Returns: | |
Dict containing dependency graph with file relationships | |
""" | |
project_dir_path = Path(project_dir).resolve() | |
G = nx.DiGraph() # Directed graph for dependencies | |
# Add all files as nodes | |
for file in files: | |
G.add_node(file) | |
# Analyze imports and dependencies | |
for file in files: | |
file_path = project_dir_path / file | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
tree = ast.parse(f.read(), filename=str(file_path)) | |
# Track imports | |
for node in ast.walk(tree): | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
module = alias.name | |
# Convert module to potential file path | |
module_path = module.replace('.', '/') | |
for target_file in files: | |
if module_path in target_file or module.split('.')[-1] in target_file: | |
G.add_edge(file, target_file) | |
elif isinstance(node, ast.ImportFrom): | |
if node.module: | |
module = node.module | |
module_path = module.replace('.', '/') | |
for target_file in files: | |
if module_path in target_file or module.split('.')[-1] in target_file: | |
G.add_edge(file, target_file) | |
except Exception as e: | |
logger.error(f"Error processing file {file_path}: {e}") | |
# Generate dependency information | |
dependency_info = { | |
"nodes": list(G.nodes()), | |
"edges": list(G.edges()), | |
"circular_dependencies": list(nx.simple_cycles(G)), | |
"dependency_order": list(nx.topological_sort(G)) if nx.is_directed_acyclic_graph(G) else [], | |
"central_files": sorted(nx.degree_centrality(G).items(), key=lambda x: x[1], reverse=True), | |
} | |
return dependency_info | |
def resolve_absolute_import(imported_name: str, project_dir_path: Path) -> Optional[str]: | |
""" | |
Resolve an absolute import to a corresponding file or module within the project. | |
""" | |
module_path = imported_name.replace('.', '/') | |
possible_paths = [ | |
project_dir_path / f"{module_path}.py", # Regular module | |
project_dir_path / module_path / "__init__.py", # Package | |
] | |
for path in possible_paths: | |
if path.exists(): | |
return str(path.relative_to(project_dir_path)) | |
return None | |
# Helper: Load Ignore Spec (Gitignore) | |
def load_ignore_spec(project_dir: str, additional_ignores: List[str]) -> PathSpec: | |
""" | |
Load patterns from .gitignore (if it exists) and merge with additional ignore patterns. | |
Returns a PathSpec object that can be used to match ignored files. | |
""" | |
gitignore_path = os.path.join(project_dir, '.gitignore') | |
patterns = [] | |
try: | |
if os.path.isfile(gitignore_path): | |
with open(gitignore_path, 'r', encoding='utf-8') as f: | |
gitignore_lines = f.read().splitlines() | |
patterns.extend(gitignore_lines) | |
patterns.extend(additional_ignores) | |
spec = PathSpec.from_lines('gitwildmatch', patterns) | |
return spec | |
except Exception as e: | |
logger.error(f"Failed to load .gitignore or additional patterns: {e}") | |
raise | |
# Helper: Gather Files (Recursive File Search) | |
def gather_files(project_dir: str, paths: List[str], spec: PathSpec) -> List[str]: | |
""" | |
Efficiently gather matched files based on provided paths and ignore specifications. | |
""" | |
matched_files = set() | |
project_dir_path = Path(project_dir) | |
try: | |
for p in paths: | |
full_path = project_dir_path / p | |
if full_path.is_dir(): | |
for root, _, files in os.walk(full_path): | |
for f in files: | |
rel_file = os.path.relpath(os.path.join(root, f), project_dir) | |
if not spec.match_file(rel_file): | |
matched_files.add(os.path.normpath(rel_file)) | |
elif full_path.is_file(): | |
rel_file = os.path.relpath(full_path, project_dir) | |
matched_files.add(os.path.normpath(rel_file)) | |
else: | |
logger.warning(f"Path does not exist or is inaccessible: {full_path}") | |
except Exception as e: | |
logger.error(f"Error gathering files: {e}") | |
raise | |
logger.info(f"Gathered files: {sorted(matched_files)}") # Log gathered files | |
return sorted(matched_files) | |
# Helper: Get Directory Tree | |
def get_directory_tree(project_dir: str, selected_files: List[str]) -> str: | |
""" | |
Generate a directory tree representation for selected files. | |
""" | |
tree_str = f"{project_dir}\n" | |
normalized_selected_files = [os.path.normpath(p) for p in selected_files] | |
try: | |
parent_dirs = set() | |
for file_path in normalized_selected_files: | |
parts = Path(file_path).parts | |
for i in range(len(parts)): | |
parent_dirs.add(os.path.join(*parts[:i+1])) | |
for root, dirs, files in os.walk(project_dir): | |
rel_path = os.path.relpath(root, project_dir) | |
if rel_path == '.': | |
continue | |
norm_rel_path = os.path.normpath(rel_path) | |
if norm_rel_path not in parent_dirs: | |
dirs.clear() | |
continue | |
level = norm_rel_path.count(os.sep) | |
indent = '│ ' * level | |
tree_str += f"{indent}├── {os.path.basename(root)}/\n" | |
selected_files_in_dir = [ | |
f for f in files | |
if os.path.normpath(os.path.join(rel_path, f)) in normalized_selected_files | |
] | |
subindent = '│ ' * (level + 1) | |
for f in selected_files_in_dir: | |
tree_str += f"{subindent}├── {f}\n" | |
except Exception as e: | |
logger.error(f"Error generating directory tree: {e}") | |
raise | |
return tree_str | |
# Helper: Output Result | |
def output_result(result: str, config: Dict[str, Any]) -> None: | |
""" | |
Output the result based on the configuration. | |
""" | |
format_type = config.get('output_format', 'text') | |
destination = config.get('output_destination', 'console') | |
output_file = config.get('output_file', 'output.txt') | |
if format_type == 'markdown': | |
result = f"```markdown\n{result}\n```" | |
if destination == 'console': | |
print(result) | |
elif destination == 'file': | |
# Confirm overwriting | |
if os.path.exists(output_file): | |
confirm = input(f"Output file {output_file} exists. Overwrite? (y/n): ").lower() | |
if confirm != "y": | |
logger.info("File write operation canceled.") | |
return | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write(result) | |
logger.info(f"Output written to file: {output_file}") | |
elif destination == 'clipboard': | |
pyperclip.copy(result) | |
logger.info("Output copied to clipboard.") | |
def gather_files(project_dir: str, paths: List[str], spec: PathSpec) -> List[str]: | |
""" | |
Efficiently gather matched files based on provided paths and ignore specifications. | |
""" | |
matched_files = set() | |
project_dir_path = Path(project_dir) | |
try: | |
for p in paths: | |
full_path = project_dir_path / p | |
if full_path.is_dir(): | |
for root, _, files in os.walk(full_path): | |
for f in files: | |
rel_file = os.path.relpath(os.path.join(root, f), project_dir) | |
if not spec.match_file(rel_file): | |
matched_files.add(os.path.normpath(rel_file)) | |
elif full_path.is_file(): | |
rel_file = os.path.relpath(full_path, project_dir) | |
matched_files.add(os.path.normpath(rel_file)) | |
else: | |
logger.warning(f"Path does not exist or is inaccessible: {full_path}") | |
except Exception as e: | |
logger.error(f"Error gathering files: {e}") | |
raise | |
logger.info(f"Gathered files: {sorted(matched_files)}") # Log gathered files | |
return sorted(matched_files) | |
def extract_function_definitions(file_path: str) -> List[Dict[str, str]]: | |
""" | |
Extract function definitions (name, path, contents) from a Python file. | |
Handles both regular and async functions. | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
tree = ast.parse(content) | |
functions = [] | |
for node in ast.walk(tree): | |
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): # Handle async and regular functions | |
func_start_line = node.lineno - 1 # Line numbers in AST are 1-based | |
func_end_line = max(getattr(node, "end_lineno", func_start_line + 1) - 1, func_start_line) | |
func_content = "\n".join(content.splitlines()[func_start_line:func_end_line + 1]) | |
functions.append({ | |
"name": node.name, | |
"path": file_path, | |
"contents": func_content | |
}) | |
if not functions: # Log if no functions are found | |
logger.warning(f"No functions found in {file_path}.") | |
return functions | |
except Exception as e: | |
logger.error(f"Error extracting functions from {file_path}: {e}") | |
return [] | |
def copy_files_to_text(config: Dict[str, Any]) -> str: | |
""" | |
Process files and generate project understanding context for LLM. | |
""" | |
try: | |
spec = load_ignore_spec(config["project_directory"], config["additional_ignores"]) | |
matched_files = gather_files(config["project_directory"], config["files_to_find"], spec) | |
if not matched_files: | |
logger.warning("No matched files found.") | |
return "No matched files found." | |
if config["dry_run"]: | |
logger.info("Dry Run: Matched Files") | |
for i, file in enumerate(matched_files, start=1): | |
logger.info(f"{i}. {file}") | |
return "Dry run complete. No files processed." | |
dependency_results = analyze_dependencies(matched_files, config["project_directory"]) | |
directory_tree = get_directory_tree(config["project_directory"], matched_files) | |
output = "==== TOKEN COUNT WARNING ====\nNOTE TO LLM: Please check the token count below. If it exceeds your context window, stop processing immediately.\n\n" | |
# Start building the output | |
output = f"==== DIRECTORY TREE ====\n{directory_tree}\n" | |
entry_file = config.get("entry_file") | |
if entry_file: | |
output += f"\n==== ENTRY FILE ====\n{entry_file}\n" | |
# Add dependency graph information | |
output += "\n==== DEPENDENCY GRAPH ====\n" | |
output += "Files and their dependencies:\n" | |
for node in dependency_results["nodes"]: | |
dependencies = [edge[1] for edge in dependency_results["edges"] if edge[0] == node] | |
if dependencies: | |
output += f"{node} depends on:\n" | |
for dep in dependencies: | |
output += f" - {dep}\n" | |
else: | |
output += f"{node} (no dependencies)\n" | |
# Add circular dependencies | |
output += "\n==== CIRCULAR DEPENDENCIES ====\n" | |
if dependency_results["circular_dependencies"]: | |
for cycle in dependency_results["circular_dependencies"]: | |
output += f" - {' -> '.join(cycle)}\n" | |
else: | |
output += "No circular dependencies detected.\n" | |
# Process each file | |
for file in matched_files: | |
file_path = Path(config["project_directory"]) / file | |
output += f"\n==== FILE: {file} ====\n" | |
# Add function index for the current file | |
functions = extract_function_definitions(str(file_path)) | |
if functions: | |
output += "#### FUNCTION INDEX ####\n" | |
entry_points = {"main", "handle_user_query"} # Define entry points explicitly | |
for func in functions: | |
highlight = " (Entry Point)" if func['name'] in entry_points else "" | |
output += f" - {func['name']}{highlight}\n" | |
# Add script content | |
output += "\n#### SCRIPT CONTENTS ####\n" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
script_content = f.read() | |
if config.get("allow_dynamic_fetch") and file in config.get("dynamic_fetch_files", []): | |
# Provide only top-level definitions with a note for dynamic fetching | |
top_level_definitions = extract_function_definitions(str(file_path)) | |
output += f"# File: {file} (Full content available upon request)\n" | |
output += "```python\n" | |
for func in top_level_definitions: | |
output += f"def {func['name']}(...):\n ...\n" # Stub definitions | |
output += "```\n" | |
else: | |
# Provide full content by default | |
output += f"```python\n{script_content}\n```\n" | |
except Exception as e: | |
logger.error(f"Error reading file {file_path}: {e}") | |
output += f"Error reading file: {e}\n" | |
# Calculate token count | |
tokens = word_tokenize(output) | |
token_count = len(tokens) | |
output = f"==== TOKEN COUNT ====\nTotal tokens in this context: {token_count}\n\n" + output | |
output_result(output, config) | |
return "Processing complete." | |
except Exception as e: | |
logger.exception("An error occurred during file processing.") | |
return f"An error occurred: {e}" | |
if __name__ == "__main__": | |
from app.backend.utils.get_project_root import load_env_file, get_project_root | |
load_env_file(env_type="backend") | |
config = { | |
"dry_run": False, # If True, only display matched files without processing them. | |
"generate_workspace": False, # If True, generate a VSCode workspace file for the project. | |
"analyze_dependencies": True, # Analyze and display dependencies between matched files. | |
"output_format": "text", # Output format: 'text' or 'markdown'. | |
"output_destination": "clipboard", # Where to send the output: 'console', 'file', or 'clipboard'. | |
"output_file": "output.txt", # File to save output if 'output_destination' is set to 'file'. | |
"project_directory": get_project_root(), # Root directory of the project. | |
"files_to_find": [ | |
# List of files or directories to include in the output. | |
"app/backend/utils/check_memory_usage.py", | |
"app/backend/llm_client/handle_user_query.py" | |
], | |
"additional_ignores": [ | |
# Patterns of files or directories to exclude. | |
"*.md", | |
"*.test.py" | |
], | |
"entry_file": "app/backend/llm_client/handle_user_query.py", # Main entry file for the project. | |
"allow_dynamic_fetch": False, # Enable or disable the option for dynamic fetching of file content. | |
"dynamic_fetch_files": [], # list of files to allow dynamic fetching of file content. | |
} | |
try: | |
result = copy_files_to_text(config) | |
if config["output_destination"] == "console": | |
print(result) | |
except Exception as e: | |
logger.error(f"An error occurred in the main script execution: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment