Last active
April 2, 2025 16:42
-
-
Save jpic/67f5e7be3bc4a55552ba3600744243c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import anyio | |
from anyio import Path | |
from tree_sitter_language_pack import get_language, get_parser | |
import os | |
from typing import Dict, Optional | |
async def scan_file(file_path: str, language_extensions: dict) -> tuple[str, Optional[dict]]: | |
"""Scan a single file and return its parsed data.""" | |
ext = os.path.splitext(file_path)[1] | |
for lang, exts in language_extensions.items(): | |
if ext in exts: | |
try: | |
language = get_language(lang) | |
parser = get_parser(lang) | |
# Read file content asynchronously | |
async with await anyio.open_file(file_path, mode='r', encoding='utf-8') as f: | |
content = await f.read() | |
# Parse the file into an AST (synchronous, as Tree-sitter isn't async) | |
tree = parser.parse(content.encode('utf-8')) | |
return file_path, {'content': content, 'tree': tree, 'language': lang} | |
except Exception as e: | |
print(f"Error parsing {file_path}: {e}") | |
return file_path, None | |
async def scan_repo(repo_path: str) -> Dict[str, dict]: | |
"""Scan the repository asynchronously.""" | |
repo_data = {} | |
# Supported languages and their file extensions | |
language_extensions = { | |
'python': ['.py'], | |
'javascript': ['.js'], | |
'java': ['.java'], | |
'cpp': ['.cpp', '.h'], | |
'ruby': ['.rb'], | |
} | |
async with anyio.create_task_group() as tg: | |
# Use an Event to signal task completion | |
done_event = anyio.Event() | |
pending_tasks = 0 | |
async def process_file(file_path: str): | |
nonlocal repo_data, pending_tasks | |
result = await scan_file(file_path, language_extensions) | |
file_path, data = result | |
if data: | |
repo_data[file_path] = data | |
# Decrease pending tasks and signal completion if all done | |
nonlocal pending_tasks | |
pending_tasks -= 1 | |
if pending_tasks == 0: | |
done_event.set() | |
# Scan all files concurrently | |
path = Path(repo_path) | |
async for file_path in path.rglob('*'): | |
if await file_path.is_file(): | |
pending_tasks += 1 | |
tg.start_soon(process_file, str(file_path)) | |
# Wait for all tasks to complete | |
if pending_tasks > 0: | |
await done_event.wait() | |
return repo_data | |
def extract_symbols(tree, language: str, file_content: str) -> list: | |
"""Extract symbols from the AST (synchronous due to Tree-sitter).""" | |
queries = { | |
'python': ''' | |
(function_definition name: (identifier) @func) | |
(class_definition name: (identifier) @class) | |
''', | |
'javascript': ''' | |
(function_declaration name: (identifier) @func) | |
(class_declaration name: (identifier) @class) | |
''', | |
# Add more as needed | |
} | |
query = queries.get(language, '') | |
if not query: | |
return [] | |
lang = get_language(language) | |
q = lang.query(query) | |
captures = q.captures(tree.root_node) | |
symbols = [] | |
for node, tag in captures: | |
name = file_content[node.start_byte:node.end_byte] | |
symbols.append({'name': name, 'type': tag, 'start_line': node.start_point[0] + 1}) | |
return symbols | |
async def build_symbol_map(repo_data: Dict[str, dict]) -> Dict[str, list]: | |
"""Build the symbol map from repo data.""" | |
symbol_map = {} | |
for file_path, data in repo_data.items(): | |
symbols = extract_symbols(data['tree'], data['language'], data['content']) | |
symbol_map[file_path] = symbols | |
return symbol_map | |
async def optimize_repo_map(symbol_map: Dict[str, list], token_budget: int = 1000) -> str: | |
"""Optimize the repo map for LLM context.""" | |
optimized_map = [] | |
current_tokens = 0 | |
# Sort files by number of symbols (heuristic for relevance) | |
sorted_files = sorted(symbol_map.items(), key=lambda x: len(x[1]), reverse=True) | |
for file_path, symbols in sorted_files: | |
file_entry = f"File: {file_path}\n" | |
for symbol in symbols: | |
symbol_entry = f"{symbol['type']}: {symbol['name']} (line {symbol['start_line']})\n" | |
entry = file_entry + symbol_entry | |
token_count = len(entry) // 4 # Rough token estimate | |
if current_tokens + token_count <= token_budget: | |
optimized_map.append(entry) | |
current_tokens += token_count | |
else: | |
break | |
if current_tokens >= token_budget: | |
break | |
return ''.join(optimized_map) | |
async def main(): | |
repo_path = './my_repo' | |
# Scan the repository | |
print("Scanning repository...") | |
repo_data = await scan_repo(repo_path) | |
# Build symbol map | |
print("Building symbol map...") | |
symbol_map = await build_symbol_map(repo_data) | |
# Optimize for LLM | |
print("Optimizing for LLM...") | |
repo_map = await optimize_repo_map(symbol_map) | |
# Print or use the optimized map | |
print("\nRepository Map:") | |
print(repo_map) | |
# Append to a file asynchronously | |
async with await anyio.open_file('repo_map.txt', mode='a', encoding='utf-8') as f: | |
await f.write(repo_map + '\n') | |
if __name__ == "__main__": | |
anyio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment