Created
April 12, 2026 19:49
-
-
Save nikola43/0c3a6737e5d5592a43df4e1fd003783c to your computer and use it in GitHub Desktop.
Claude graphify setup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| graphify-setup — one-command project indexing for AI-assisted development. | |
| Generates a knowledge graph, Obsidian vault, wiki, and CLAUDE.md with | |
| embedded architecture snapshot. Works with any codebase. | |
| Usage: | |
| python graphify-setup.py init [PATH] [--no-obsidian] [--no-wiki] | |
| python graphify-setup.py update [PATH] | |
| python graphify-setup.py status [PATH] | |
| python graphify-setup.py refresh-claude [PATH] | |
| python graphify-setup.py watch [PATH] [--debounce SEC] | |
| """ | |
| __version__ = "1.0.0" | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import signal | |
| import logging | |
| import argparse | |
| import subprocess | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| log = logging.getLogger("graphify-setup") | |
| # ── configuration ───────────────────────────────────────────────────── | |
| IGNORE_PATTERNS = """\ | |
| # Dependencies | |
| node_modules/ | |
| vendor/ | |
| .venv/ | |
| venv/ | |
| env/ | |
| __pycache__/ | |
| .mypy_cache/ | |
| .pytest_cache/ | |
| .ruff_cache/ | |
| # Build output | |
| dist/ | |
| build/ | |
| target/ | |
| out/ | |
| .next/ | |
| .nuxt/ | |
| .output/ | |
| coverage/ | |
| *.egg-info/ | |
| # Version control & CI | |
| .git/ | |
| .github/ | |
| .gitlab/ | |
| # Graphify output (avoid self-reference) | |
| graphify-out/ | |
| # Lock files (no semantic value) | |
| *.lock | |
| package-lock.json | |
| yarn.lock | |
| pnpm-lock.yaml | |
| Gemfile.lock | |
| Pipfile.lock | |
| poetry.lock | |
| composer.lock | |
| # Environment & secrets | |
| .env | |
| .env.* | |
| *.pem | |
| *.key | |
| # Logs & temp | |
| *.log | |
| *.tmp | |
| *.swp | |
| .DS_Store | |
| Thumbs.db | |
| # Graphify tooling (not part of the project) | |
| graphify-setup.py | |
| CLAUDE.md | |
| """ | |
| # ── CLAUDE.md template ──────────────────────────────────────────────── | |
| # This is the core token-saving mechanism. Every byte here is loaded into | |
| # every Claude session, so it must be dense and actionable. | |
| CLAUDE_MD_TEMPLATE = """\ | |
| # graphify | |
| - **graphify** (`~/.claude/skills/graphify/SKILL.md`) — any input to knowledge graph. Trigger: `/graphify` | |
| When the user types `/graphify`, invoke the Skill tool with `skill: "graphify"` before doing anything else. | |
| This project has a graphify knowledge graph at graphify-out/. | |
| ## Architecture | |
| <!-- AUTO-GENERATED by graphify-setup.py — do not edit below this line --> | |
| {architecture} | |
| <!-- END AUTO-GENERATED --> | |
| ## Rules | |
| Before answering architecture or codebase questions, check graphify-out/ first. | |
| If graphify-out/wiki/index.md exists, navigate it instead of reading raw files. | |
| ### Lookup order (stop at first sufficient answer) | |
| 1. This file — architecture snapshot above | |
| 2. `/graphify query "<question>"` — graph traversal for relationships | |
| 3. `/graphify explain "<node>"` or `/graphify path "A" "B"` — targeted lookups | |
| 4. `graphify-out/wiki/<community>.md` — summarized per-module knowledge | |
| 5. Raw file read — LAST RESORT, only the minimum files needed | |
| ### Raw file reads: FORBIDDEN unless | |
| - User explicitly asks: "show the code", "read the file", "open X" | |
| - User wants to EDIT a file (must read target before editing) | |
| - Graph query returned no matching nodes for the question | |
| - Debugging a runtime error that requires actual source | |
| ### Token discipline | |
| - Max 3 raw file reads per response — if you need more, ask the user | |
| - Never read a file just to summarize it — the wiki already has summaries | |
| - Never glob/grep the codebase before checking if the graph has the answer | |
| - Use `graphify query --budget 1500` to cap traversal output | |
| ### After modifying code in this session | |
| Run to keep graph current (AST-only, no LLM cost): | |
| ``` | |
| python3 -c "from graphify.watch import _rebuild_code; from pathlib import Path; _rebuild_code(Path('.'))" | |
| ``` | |
| ### Full rebuild commands | |
| - Code changes only: `/graphify . --update` | |
| - Added docs/images: `/graphify . --mode deep --update` | |
| - Full re-index: `python graphify-setup.py init` | |
| """ | |
| # ── helpers ─────────────────────────────────────────────────────────── | |
| def _timer(): | |
| """Context-manager-style timer. Returns a callable that gives elapsed seconds.""" | |
| start = time.monotonic() | |
| return lambda: round(time.monotonic() - start, 1) | |
| def ensure_graphify(): | |
| """Import graphify or install it. Exit on failure.""" | |
| try: | |
| import graphify # noqa: F401 | |
| return | |
| except ImportError: | |
| pass | |
| log.info("graphify not found — installing...") | |
| ret = subprocess.run( | |
| [sys.executable, "-m", "pip", "install", "graphifyy", "-q"], | |
| capture_output=True, text=True, | |
| ) | |
| if ret.returncode != 0: | |
| # Try with --break-system-packages for managed environments | |
| ret = subprocess.run( | |
| [sys.executable, "-m", "pip", "install", "graphifyy", "-q", "--break-system-packages"], | |
| capture_output=True, text=True, | |
| ) | |
| try: | |
| import graphify # noqa: F401 | |
| except ImportError: | |
| log.error(f"Failed to install graphify. pip output:\n{ret.stderr}") | |
| log.error("Try manually: pip install graphifyy") | |
| sys.exit(1) | |
| def write_ignore(project_dir: Path): | |
| """Create .graphifyignore if missing.""" | |
| p = project_dir / ".graphifyignore" | |
| if p.exists(): | |
| log.info(".graphifyignore exists, skipping") | |
| return | |
| p.write_text(IGNORE_PATTERNS) | |
| log.info("Created .graphifyignore") | |
| def detect_files(project_dir: Path) -> dict: | |
| """Run graphify file detection.""" | |
| from graphify.detect import detect | |
| out_dir = project_dir / "graphify-out" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| result = detect(project_dir) | |
| (out_dir / ".graphify_detect.json").write_text(json.dumps(result, indent=2)) | |
| return result | |
| def run_ast(project_dir: Path, detection: dict) -> dict: | |
| """AST extraction for code files.""" | |
| from graphify.extract import collect_files, extract | |
| code_files = [] | |
| for f in detection.get("files", {}).get("code", []): | |
| p = Path(f) | |
| if p.is_dir(): | |
| code_files.extend(collect_files(p)) | |
| elif p.exists(): | |
| code_files.append(p) | |
| out_dir = project_dir / "graphify-out" | |
| if not code_files: | |
| empty = {"nodes": [], "edges": [], "input_tokens": 0, "output_tokens": 0} | |
| (out_dir / ".graphify_ast.json").write_text(json.dumps(empty)) | |
| log.info("No code files — skipping AST") | |
| return empty | |
| result = extract(code_files) | |
| (out_dir / ".graphify_ast.json").write_text(json.dumps(result, indent=2)) | |
| log.info(f"AST: {len(result['nodes'])} nodes, {len(result['edges'])} edges") | |
| return result | |
| def run_cache_and_semantic(project_dir: Path, detection: dict): | |
| """Check cache, merge semantic results.""" | |
| from graphify.cache import check_semantic_cache | |
| all_files = [f for files in detection["files"].values() for f in files] | |
| cached_nodes, cached_edges, cached_hyperedges, uncached = check_semantic_cache(all_files) | |
| out_dir = project_dir / "graphify-out" | |
| cached = { | |
| "nodes": cached_nodes or [], | |
| "edges": cached_edges or [], | |
| "hyperedges": cached_hyperedges or [], | |
| } | |
| (out_dir / ".graphify_cached.json").write_text(json.dumps(cached)) | |
| hit = len(all_files) - len(uncached) | |
| log.info(f"Cache: {hit} hit, {len(uncached)} uncached") | |
| # Deduplicate cached nodes | |
| seen = set() | |
| deduped = [] | |
| for n in cached["nodes"]: | |
| if n["id"] not in seen: | |
| seen.add(n["id"]) | |
| deduped.append(n) | |
| semantic = { | |
| "nodes": deduped, | |
| "edges": cached["edges"], | |
| "hyperedges": cached["hyperedges"], | |
| "input_tokens": 0, | |
| "output_tokens": 0, | |
| } | |
| (out_dir / ".graphify_semantic.json").write_text(json.dumps(semantic, indent=2)) | |
| log.info(f"Semantic: {len(deduped)} nodes from cache") | |
| return uncached | |
| def merge_extractions(project_dir: Path) -> dict: | |
| """Merge AST + semantic into final extraction.""" | |
| out_dir = project_dir / "graphify-out" | |
| ast = json.loads((out_dir / ".graphify_ast.json").read_text()) | |
| sem = json.loads((out_dir / ".graphify_semantic.json").read_text()) | |
| seen = {n["id"] for n in ast["nodes"]} | |
| merged_nodes = list(ast["nodes"]) | |
| for n in sem["nodes"]: | |
| if n["id"] not in seen: | |
| merged_nodes.append(n) | |
| seen.add(n["id"]) | |
| merged = { | |
| "nodes": merged_nodes, | |
| "edges": ast["edges"] + sem["edges"], | |
| "hyperedges": sem.get("hyperedges", []), | |
| "input_tokens": sem.get("input_tokens", 0), | |
| "output_tokens": sem.get("output_tokens", 0), | |
| } | |
| (out_dir / ".graphify_extract.json").write_text(json.dumps(merged, indent=2)) | |
| log.info(f"Merged: {len(merged_nodes)} nodes, {len(merged['edges'])} edges") | |
| return merged | |
| def build_graph(project_dir: Path, detection: dict): | |
| """Build graph, cluster, analyze.""" | |
| from graphify.build import build_from_json | |
| from graphify.cluster import cluster, score_all | |
| from graphify.analyze import god_nodes, surprising_connections, suggest_questions | |
| from graphify.report import generate | |
| from graphify.export import to_json | |
| out_dir = project_dir / "graphify-out" | |
| extraction = json.loads((out_dir / ".graphify_extract.json").read_text()) | |
| G = build_from_json(extraction) | |
| if G.number_of_nodes() == 0: | |
| log.error("Graph is empty — no nodes extracted. Check that files are supported.") | |
| sys.exit(1) | |
| communities = cluster(G) | |
| cohesion = score_all(G, communities) | |
| tokens = {"input": extraction.get("input_tokens", 0), "output": extraction.get("output_tokens", 0)} | |
| gods = god_nodes(G) | |
| surprises = surprising_connections(G, communities) | |
| # Auto-label communities from node names | |
| labels = _label_communities(G, communities) | |
| questions = suggest_questions(G, communities, labels) | |
| report = generate( | |
| G, communities, cohesion, labels, gods, surprises, | |
| detection, tokens, str(project_dir), suggested_questions=questions, | |
| ) | |
| (out_dir / "GRAPH_REPORT.md").write_text(report) | |
| to_json(G, communities, str(out_dir / "graph.json")) | |
| # Persist analysis + labels (needed by refresh-claude and status) | |
| analysis = { | |
| "communities": {str(k): v for k, v in communities.items()}, | |
| "cohesion": {str(k): v for k, v in cohesion.items()}, | |
| "gods": gods, | |
| "surprises": surprises, | |
| "questions": questions, | |
| } | |
| (out_dir / ".graphify_analysis.json").write_text(json.dumps(analysis, indent=2)) | |
| (out_dir / ".graphify_labels.json").write_text(json.dumps({str(k): v for k, v in labels.items()})) | |
| log.info(f"Graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {len(communities)} communities") | |
| return G, communities, cohesion, gods, labels | |
| _GENERIC_NAMES = frozenset({ | |
| "main", "init", "setup", "fixture", "test", "run", "get", "set", | |
| "open", "close", "read", "write", "print", "check", "load", "save", | |
| "start", "stop", "create", "delete", "update", "parse", "format", | |
| "sort", "filter", "map", "reduce", "zip", "sum", "max", "min", | |
| "abs", "len", "str", "int", "list", "dict", "bytes", "hash", | |
| "decode", "encode", "match", "split", "join", "pack", "unpack", | |
| "constructor", "middleware", "wrapper", "handler", "callback", | |
| }) | |
| def _label_communities(G, communities: dict) -> dict: | |
| """Auto-label communities with meaningful names. | |
| Strategy: | |
| 1. Prefer non-generic function/class names from the community | |
| 2. Fall back to source file stem (without extension) for context | |
| 3. Deduplicate labels by appending source file context | |
| """ | |
| labels = {} | |
| used_labels = {} # track label -> cid to detect duplicates | |
| for cid, node_ids in communities.items(): | |
| cid_int = int(cid) if isinstance(cid, str) else cid | |
| # Collect function/class names (non-file nodes) | |
| func_names = [] | |
| file_names = [] | |
| for nid in node_ids: | |
| if nid not in G.nodes: | |
| continue | |
| lbl = G.nodes[nid].get("label", nid) | |
| src = G.nodes[nid].get("source_file", "") | |
| if "." in lbl or "/" in lbl: | |
| # This is a file-level node — extract stem for fallback | |
| file_names.append(Path(lbl).stem if "." in lbl else Path(src).stem) | |
| else: | |
| func_names.append((lbl, src)) | |
| # Pick best label: prefer non-generic, longest (more specific) name | |
| # Filter out comments, strings, and overly long labels | |
| best = None | |
| for name, src in sorted(func_names, key=lambda x: len(x[0]), reverse=True): | |
| clean = name.rstrip("()") | |
| # Skip comments, strings, hashes, or labels > 60 chars | |
| if clean.startswith("#") or clean.startswith("//") or clean.startswith("'") or clean.startswith('"'): | |
| continue | |
| if len(clean) > 60: | |
| continue | |
| if clean.lower() not in _GENERIC_NAMES: | |
| best = (name, src) | |
| break | |
| if not best and func_names: | |
| # All names are generic — use the most common source file for context | |
| name, src = func_names[0] | |
| stem = Path(src).stem if src else "" | |
| best = (f"{stem}/{name}" if stem else name, src) | |
| if not best and file_names: | |
| best = (file_names[0], "") | |
| if not best: | |
| best = (f"Module {cid_int}", "") | |
| label = best[0] | |
| src = best[1] | |
| # Deduplicate: if label already used, add file context | |
| if label in used_labels: | |
| # Fix the previous one too | |
| prev_cid = used_labels[label] | |
| if prev_cid is not None: | |
| prev_nodes = communities.get(prev_cid, communities.get(str(prev_cid), [])) | |
| prev_src = "" | |
| for nid in prev_nodes: | |
| if nid in G.nodes: | |
| prev_src = G.nodes[nid].get("source_file", "") | |
| if prev_src: | |
| break | |
| if prev_src: | |
| labels[int(prev_cid)] = f"{labels[int(prev_cid)]} ({Path(prev_src).stem})" | |
| used_labels[label] = None # mark as already fixed | |
| # Add context to current one | |
| stem = Path(src).stem if src else str(cid_int) | |
| label = f"{label} ({stem})" | |
| used_labels.setdefault(label.split(" (")[0], cid_int) | |
| labels[cid_int] = label | |
| return labels | |
| def generate_outputs(project_dir: Path, G, communities, cohesion, labels, *, obsidian=True, wiki=True, html=True): | |
| """Generate all output artifacts.""" | |
| out_dir = project_dir / "graphify-out" | |
| if html and G.number_of_nodes() <= 5000: | |
| from graphify.export import to_html | |
| to_html(G, communities, str(out_dir / "graph.html"), community_labels=labels or None) | |
| log.info("graph.html written") | |
| elif html: | |
| log.warning(f"{G.number_of_nodes()} nodes — skipping HTML, use Obsidian") | |
| if obsidian: | |
| from graphify.export import to_obsidian, to_canvas | |
| obsidian_dir = str(out_dir / "obsidian") | |
| n = to_obsidian(G, communities, obsidian_dir, community_labels=labels or None, cohesion=cohesion) | |
| to_canvas(G, communities, f"{obsidian_dir}/graph.canvas", community_labels=labels or None) | |
| log.info(f"Obsidian: {n} notes") | |
| if wiki: | |
| _generate_wiki(project_dir, G, communities, labels) | |
| def _generate_wiki(project_dir: Path, G, communities, labels): | |
| """Generate wiki articles from graph communities.""" | |
| out_dir = project_dir / "graphify-out" | |
| wiki_dir = out_dir / "wiki" | |
| wiki_dir.mkdir(parents=True, exist_ok=True) | |
| # Sort communities by size (largest first) | |
| sorted_cids = sorted(communities.keys(), key=lambda c: len(communities[c]), reverse=True) | |
| # Split into significant (3+ nodes) and small (1-2 nodes) | |
| significant = [(c, communities[c]) for c in sorted_cids if len(communities[c]) >= 3] | |
| small = [(c, communities[c]) for c in sorted_cids if len(communities[c]) < 3] | |
| # Index page — only list significant communities + summary of small ones | |
| lines = [ | |
| "# Project Knowledge Graph Wiki", | |
| "", | |
| f"**{G.number_of_nodes()} nodes** | **{G.number_of_edges()} edges** | **{len(communities)} communities**", | |
| "", "## Key Modules", "", | |
| ] | |
| for cid, nodes in significant: | |
| label = labels.get(cid, f"Community {cid}") | |
| slug = _slug(label) | |
| lines.append(f"- [{label}]({slug}.md) ({len(nodes)} nodes)") | |
| if small: | |
| lines += [ | |
| "", | |
| f"## Small Modules ({len(small)} communities with 1-2 nodes)", | |
| "", | |
| "These are isolated files or functions with few connections. Articles still exist for each.", | |
| "", | |
| ] | |
| # List first 20 small modules, then summarize the rest | |
| for cid, nodes in small[:20]: | |
| label = labels.get(cid, f"Community {cid}") | |
| slug = _slug(label) | |
| lines.append(f"- [{label}]({slug}.md) ({len(nodes)} nodes)") | |
| if len(small) > 20: | |
| lines.append(f"- ... and {len(small) - 20} more small modules") | |
| # God nodes from analysis | |
| analysis_path = out_dir / ".graphify_analysis.json" | |
| if analysis_path.exists(): | |
| analysis = json.loads(analysis_path.read_text()) | |
| gods = analysis.get("gods", []) | |
| if gods: | |
| lines += ["", "## God Nodes (most connected)", ""] | |
| for g in gods[:10]: | |
| lines.append(f"- **{g['label']}** ({g['edges']} edges)") | |
| (wiki_dir / "index.md").write_text("\n".join(lines)) | |
| # Per-community articles | |
| for cid in sorted(communities.keys()): | |
| label = labels.get(cid, f"Community {cid}") | |
| node_ids = communities[cid] | |
| node_set = set(node_ids) | |
| art = [f"# {label}", "", "## Nodes", ""] | |
| for nid in node_ids: | |
| if nid in G.nodes: | |
| nd = G.nodes[nid] | |
| art.append(f"- **{nd.get('label', nid)}** — `{nd.get('source_file', '')}`") | |
| art += ["", "## Relationships", ""] | |
| for nid in node_ids: | |
| if nid not in G.nodes: | |
| continue | |
| for neighbor in G.neighbors(nid): | |
| e = G.edges[nid, neighbor] | |
| nl = G.nodes[nid].get("label", nid) | |
| tl = G.nodes[neighbor].get("label", neighbor) | |
| cross = "" if neighbor in node_set else " *(cross-community)*" | |
| art.append(f"- {nl} --**{e.get('relation', '?')}**--> {tl} [{e.get('confidence', '')}]{cross}") | |
| (wiki_dir / f"{_slug(label)}.md").write_text("\n".join(art)) | |
| log.info(f"Wiki: {len(communities) + 1} articles") | |
| def _slug(label: str) -> str: | |
| return label.lower().replace(" ", "-").replace("/", "-").replace("(", "").replace(")", "") | |
| # ── architecture snapshot for CLAUDE.md ─────────────────────────────── | |
| def _build_architecture_snapshot(project_dir: Path) -> str: | |
| """Build concise architecture description from graph data.""" | |
| out_dir = project_dir / "graphify-out" | |
| graph_path = out_dir / "graph.json" | |
| if not graph_path.exists(): | |
| return "No graph built yet. Run: python graphify-setup.py init" | |
| from networkx.readwrite import json_graph | |
| data = json.loads(graph_path.read_text()) | |
| G = json_graph.node_link_graph(data, edges="links") | |
| # Load or recover labels and communities | |
| labels_path = out_dir / ".graphify_labels.json" | |
| analysis_path = out_dir / ".graphify_analysis.json" | |
| labels = {} | |
| if labels_path.exists(): | |
| labels = {int(k): v for k, v in json.loads(labels_path.read_text()).items()} | |
| communities = {} | |
| gods = [] | |
| surprises = [] | |
| if analysis_path.exists(): | |
| analysis = json.loads(analysis_path.read_text()) | |
| communities = analysis.get("communities", {}) | |
| gods = analysis.get("gods", []) | |
| surprises = analysis.get("surprises", []) | |
| # Recover from graph.json if analysis was cleaned up | |
| if not communities: | |
| comm_map = {} | |
| for nid, ndata in G.nodes(data=True): | |
| cid = ndata.get("community", 0) | |
| comm_map.setdefault(str(cid), []).append(nid) | |
| communities = comm_map | |
| if not labels: | |
| labels = _label_communities(G, {int(k): v for k, v in communities.items()}) | |
| if not gods: | |
| gods = [ | |
| {"label": G.nodes[n].get("label", n), "edges": d} | |
| for n, d in sorted(G.degree(), key=lambda x: x[1], reverse=True)[:5] | |
| if d > 1 | |
| ] | |
| sig_count = sum(1 for v in communities.values() if len(v) >= 3) | |
| small_count = len(communities) - sig_count | |
| lines = [f"{G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {sig_count} key modules (+{small_count} small)", ""] | |
| # Only show communities with 3+ nodes, capped at 20 | |
| sorted_comms = sorted(communities.items(), key=lambda x: len(x[1]), reverse=True) | |
| shown = 0 | |
| for cid, nodes in sorted_comms: | |
| if len(nodes) < 3: | |
| break | |
| label = labels.get(int(cid), f"Module {cid}") | |
| node_labels = [ | |
| G.nodes[n].get("label", "") | |
| for n in nodes | |
| if n in G.nodes and "." not in G.nodes[n].get("label", "") and "/" not in G.nodes[n].get("label", "") | |
| and G.nodes[n].get("label", "") | |
| ] | |
| if node_labels: | |
| lines.append(f"- **{label}**: {', '.join(node_labels[:5])}") | |
| shown += 1 | |
| if shown >= 20: | |
| remaining = sig_count - shown | |
| if remaining > 0: | |
| lines.append(f"- ... and {remaining} more (see wiki/index.md)") | |
| break | |
| if gods: | |
| lines.append("") | |
| lines.append("God nodes: " + ", ".join(f"{g['label']} ({g['edges']})" for g in gods[:5])) | |
| if surprises: | |
| lines.append("") | |
| for s in surprises[:3]: | |
| if isinstance(s, dict) and s.get("description"): | |
| lines.append(f"- Surprise: {s['description']}") | |
| return "\n".join(lines) | |
| def write_claude_md(project_dir: Path): | |
| """Generate CLAUDE.md with architecture snapshot.""" | |
| architecture = _build_architecture_snapshot(project_dir) | |
| content = CLAUDE_MD_TEMPLATE.format(architecture=architecture) | |
| target = project_dir / "CLAUDE.md" | |
| # If CLAUDE.md exists and has non-graphify content, preserve it | |
| if target.exists(): | |
| existing = target.read_text() | |
| # Check if there's content before the graphify section | |
| marker = "# graphify" | |
| if marker in existing: | |
| before = existing[: existing.index(marker)].rstrip() | |
| if before: | |
| content = before + "\n\n" + content | |
| elif not existing.startswith("# graphify") and not existing.startswith("# Graphify"): | |
| # Entirely different CLAUDE.md — prepend graphify section | |
| content = content + "\n" + existing | |
| target.write_text(content) | |
| log.info("CLAUDE.md written") | |
| # ── manifest & cleanup ──────────────────────────────────────────────── | |
| def save_manifest(project_dir: Path, detection: dict): | |
| """Save manifest for incremental updates.""" | |
| from graphify.detect import save_manifest as _save | |
| _save(detection["files"]) | |
| def update_cost(project_dir: Path, detection: dict): | |
| """Update cumulative cost tracker.""" | |
| out_dir = project_dir / "graphify-out" | |
| extract_path = out_dir / ".graphify_extract.json" | |
| if not extract_path.exists(): | |
| return | |
| extract = json.loads(extract_path.read_text()) | |
| inp = extract.get("input_tokens", 0) | |
| out = extract.get("output_tokens", 0) | |
| cost_path = out_dir / "cost.json" | |
| cost = json.loads(cost_path.read_text()) if cost_path.exists() else { | |
| "runs": [], "total_input_tokens": 0, "total_output_tokens": 0, | |
| } | |
| cost["runs"].append({ | |
| "date": datetime.now(timezone.utc).isoformat(), | |
| "input_tokens": inp, "output_tokens": out, | |
| "files": detection.get("total_files", 0), | |
| }) | |
| cost["total_input_tokens"] += inp | |
| cost["total_output_tokens"] += out | |
| cost_path.write_text(json.dumps(cost, indent=2)) | |
| def cleanup_temp(project_dir: Path): | |
| """Remove temp files, keep analysis + labels.""" | |
| out_dir = project_dir / "graphify-out" | |
| for name in [ | |
| ".graphify_detect.json", ".graphify_extract.json", | |
| ".graphify_ast.json", ".graphify_semantic.json", | |
| ".graphify_cached.json", ".graphify_uncached.txt", | |
| ".needs_update", | |
| ]: | |
| p = out_dir / name | |
| if p.exists(): | |
| p.unlink() | |
| # ── commands ────────────────────────────────────────────────────────── | |
| def cmd_init(project_dir: Path, *, obsidian=True, wiki=True): | |
| """Full deep index with --mode deep --wiki --obsidian by default.""" | |
| elapsed = _timer() | |
| ensure_graphify() | |
| write_ignore(project_dir) | |
| log.info(f"Indexing {project_dir} (deep mode)...") | |
| detection = detect_files(project_dir) | |
| total = detection.get("total_files", 0) | |
| words = detection.get("total_words", 0) | |
| log.info(f"Corpus: {total} files, ~{words:,} words") | |
| for ftype, flist in detection.get("files", {}).items(): | |
| if flist: | |
| log.info(f" {ftype}: {len(flist)}") | |
| if total == 0: | |
| log.error("No supported files found.") | |
| sys.exit(1) | |
| # Warn on large corpora | |
| if total > 200 or words > 2_000_000: | |
| log.warning(f"Large corpus ({total} files, ~{words:,} words). This may take a while.") | |
| # AST extraction (code files — deterministic, free) | |
| run_ast(project_dir, detection) | |
| # Semantic extraction (check cache, extract uncached via Claude) | |
| uncached = run_cache_and_semantic(project_dir, detection) | |
| if uncached: | |
| log.info(f"{len(uncached)} files need semantic extraction (docs/papers/images).") | |
| log.info("Run `/graphify . --mode deep` in Claude to extract these with subagents.") | |
| log.info("Proceeding with AST + cached data for now.") | |
| merge_extractions(project_dir) | |
| # Build graph + outputs | |
| G, communities, cohesion, _, labels = build_graph(project_dir, detection) | |
| generate_outputs(project_dir, G, communities, cohesion, labels, obsidian=obsidian, wiki=wiki) | |
| # Finalize | |
| save_manifest(project_dir, detection) | |
| update_cost(project_dir, detection) | |
| write_claude_md(project_dir) | |
| cleanup_temp(project_dir) | |
| log.info("") | |
| log.info(f"Done in {elapsed()}s. Outputs in graphify-out/:") | |
| log.info(" graph.html — interactive visualization") | |
| log.info(" graph.json — queryable graph data") | |
| log.info(" GRAPH_REPORT.md — audit report with god nodes") | |
| if obsidian: | |
| log.info(" obsidian/ — Obsidian vault + canvas") | |
| if wiki: | |
| log.info(" wiki/ — Claude memory layer") | |
| log.info(" ../CLAUDE.md — Graphify-first rules for Claude") | |
| if uncached: | |
| log.info("") | |
| log.info(f"NOTE: {len(uncached)} files still need deep semantic extraction.") | |
| log.info(" Run in Claude: /graphify . --mode deep --update") | |
| def cmd_update(project_dir: Path): | |
| """Incremental update — only changed files.""" | |
| ensure_graphify() | |
| out_dir = project_dir / "graphify-out" | |
| if not (out_dir / "graph.json").exists(): | |
| log.warning("No existing graph — running full init.") | |
| cmd_init(project_dir) | |
| return | |
| from graphify.detect import detect_incremental | |
| result = detect_incremental(project_dir) | |
| new_total = result.get("new_total", 0) | |
| if new_total == 0: | |
| log.info("No files changed. Graph is current.") | |
| return | |
| elapsed = _timer() | |
| log.info(f"{new_total} changed file(s), rebuilding...") | |
| detection = detect_files(project_dir) | |
| run_ast(project_dir, detection) | |
| run_cache_and_semantic(project_dir, detection) | |
| merge_extractions(project_dir) | |
| G, communities, cohesion, _, labels = build_graph(project_dir, detection) | |
| generate_outputs(project_dir, G, communities, cohesion, labels) | |
| save_manifest(project_dir, detection) | |
| update_cost(project_dir, detection) | |
| write_claude_md(project_dir) | |
| cleanup_temp(project_dir) | |
| log.info(f"Update complete in {elapsed()}s.") | |
| def cmd_status(project_dir: Path): | |
| """Show current graph stats.""" | |
| ensure_graphify() | |
| out_dir = project_dir / "graphify-out" | |
| graph_path = out_dir / "graph.json" | |
| if not graph_path.exists(): | |
| print("No graph found. Run: python graphify-setup.py init") | |
| return | |
| from networkx.readwrite import json_graph | |
| data = json.loads(graph_path.read_text()) | |
| G = json_graph.node_link_graph(data, edges="links") | |
| labels_path = out_dir / ".graphify_labels.json" | |
| labels = json.loads(labels_path.read_text()) if labels_path.exists() else {} | |
| analysis_path = out_dir / ".graphify_analysis.json" | |
| gods = [] | |
| if analysis_path.exists(): | |
| gods = json.loads(analysis_path.read_text()).get("gods", []) | |
| cost_path = out_dir / "cost.json" | |
| cost = json.loads(cost_path.read_text()) if cost_path.exists() else {} | |
| mtime = datetime.fromtimestamp(graph_path.stat().st_mtime).strftime("%Y-%m-%d %H:%M") | |
| print(f"Graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {len(labels)} communities") | |
| print(f"Last built: {mtime}") | |
| print(f"God nodes: {', '.join(g['label'] for g in gods[:5]) if gods else 'none'}") | |
| print(f"Total runs: {len(cost.get('runs', []))}") | |
| print(f"Total tokens: {cost.get('total_input_tokens', 0):,} in / {cost.get('total_output_tokens', 0):,} out") | |
| print() | |
| outputs = { | |
| "graph.html": "Interactive viz", | |
| "graph.json": "Raw graph", | |
| "GRAPH_REPORT.md": "Audit report", | |
| "obsidian/": "Obsidian vault", | |
| "wiki/index.md": "Wiki memory", | |
| } | |
| for path, desc in outputs.items(): | |
| p = out_dir / path | |
| exists = p.exists() or (p.is_dir() if path.endswith("/") else False) | |
| mark = "ok" if exists else "missing" | |
| print(f" [{mark:>7}] {path:<20} {desc}") | |
| claude_md = project_dir / "CLAUDE.md" | |
| mark = "ok" if claude_md.exists() else "missing" | |
| print(f" [{mark:>7}] {'CLAUDE.md':<20} Graphify-first rules") | |
| def cmd_refresh_claude(project_dir: Path): | |
| """Regenerate CLAUDE.md from existing graph.""" | |
| ensure_graphify() | |
| if not (project_dir / "graphify-out" / "graph.json").exists(): | |
| log.error("No graph found. Run: python graphify-setup.py init") | |
| sys.exit(1) | |
| write_claude_md(project_dir) | |
| def cmd_watch(project_dir: Path, debounce_sec: int = 5): | |
| """Watch for changes and auto-rebuild.""" | |
| try: | |
| from watchdog.observers import Observer | |
| from watchdog.events import FileSystemEventHandler | |
| except ImportError: | |
| log.error("watchdog required for watch mode. Run: pip install watchdog") | |
| sys.exit(1) | |
| ensure_graphify() | |
| write_ignore(project_dir) | |
| if not (project_dir / "graphify-out" / "graph.json").exists(): | |
| log.info("No graph — running initial index...") | |
| cmd_init(project_dir) | |
| # Load ignore patterns | |
| ignore_path = project_dir / ".graphifyignore" | |
| ignore_dirs = set() | |
| if ignore_path.exists(): | |
| for line in ignore_path.read_text().splitlines(): | |
| line = line.strip() | |
| if line and not line.startswith("#") and line.endswith("/"): | |
| ignore_dirs.add(line.rstrip("/")) | |
| class Handler(FileSystemEventHandler): | |
| def __init__(self): | |
| self.last_run = 0.0 | |
| def on_modified(self, event): | |
| if event.is_directory: | |
| return | |
| src = event.src_path | |
| # Skip ignored directories | |
| for d in ignore_dirs: | |
| if f"/{d}/" in src or src.endswith(f"/{d}"): | |
| return | |
| # Debounce | |
| now = time.time() | |
| if (now - self.last_run) < debounce_sec: | |
| return | |
| self.last_run = now | |
| log.info(f"Changed: {Path(src).name}") | |
| try: | |
| cmd_update(project_dir) | |
| except Exception as e: | |
| log.error(f"Update failed: {e}") | |
| observer = Observer() | |
| observer.schedule(Handler(), str(project_dir), recursive=True) | |
| observer.start() | |
| log.info(f"Watching {project_dir} (debounce={debounce_sec}s, Ctrl+C to stop)") | |
| def _stop(*_): | |
| observer.stop() | |
| log.info("Stopped.") | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, _stop) | |
| signal.signal(signal.SIGTERM, _stop) | |
| while observer.is_alive(): | |
| observer.join(timeout=1) | |
| # ── main ────────────────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="graphify-setup — one-command project indexing for AI development", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog="""\ | |
| examples: | |
| python graphify-setup.py init . # index current directory | |
| python graphify-setup.py init /path/to/project # index any project | |
| python graphify-setup.py init --no-obsidian # skip Obsidian vault | |
| python graphify-setup.py update # incremental rebuild | |
| python graphify-setup.py status # show graph stats | |
| python graphify-setup.py refresh-claude # regenerate CLAUDE.md only | |
| python graphify-setup.py watch --debounce 10 # auto-rebuild on changes | |
| """, | |
| ) | |
| parser.add_argument("cmd", choices=["init", "update", "status", "refresh-claude", "watch"]) | |
| parser.add_argument("path", nargs="?", default=".", help="Project directory (default: .)") | |
| parser.add_argument("--no-obsidian", action="store_true", help="Skip Obsidian vault generation") | |
| parser.add_argument("--no-wiki", action="store_true", help="Skip wiki generation") | |
| parser.add_argument("--debounce", type=int, default=5, help="Watch debounce in seconds (default: 5)") | |
| args = parser.parse_args() | |
| project_dir = Path(args.path).resolve() | |
| if not project_dir.is_dir(): | |
| log.error(f"Not a directory: {project_dir}") | |
| sys.exit(1) | |
| os.chdir(project_dir) | |
| cmd_map = { | |
| "init": lambda: cmd_init(project_dir, obsidian=not args.no_obsidian, wiki=not args.no_wiki), | |
| "update": lambda: cmd_update(project_dir), | |
| "status": lambda: cmd_status(project_dir), | |
| "refresh-claude": lambda: cmd_refresh_claude(project_dir), | |
| "watch": lambda: cmd_watch(project_dir, debounce_sec=args.debounce), | |
| } | |
| cmd_map[args.cmd]() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment