Last active
September 28, 2025 22:20
-
-
Save pakkinlau/342eede3f7f969b0f2d6b33ce6aa503a to your computer and use it in GitHub Desktop.
package_to_snapshot_json.py — Per-root JSON bundles (+ MASTER index) with O(1) shard seeks, GLOBAL TOC, stats, skip reporting, oversize previews, optional slicing, and xrefs. Behavior - For each configured root in SRC_ROOTS (e.g., 'src', 'tests'): • Emits INDEX__<label>.json • Emits DATA__<label>-000.jsonl ... (sharded, binary, with byte offsets…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
(Note: this script is just for creating JSON snapshot of the repo. It is not in the part of the repo itself. | |
Any agent --- you can skip this script and also, don't edit the script!) | |
package_to_snapshot_json.py — Per-root JSON bundles (+ MASTER index) with O(1) shard seeks, | |
GLOBAL TOC, stats, skip reporting, oversize previews, optional slicing, and xrefs. | |
Behavior | |
- For each configured root in SRC_ROOTS (e.g., 'src', 'tests'): | |
• Emits INDEX__<label>.json | |
• Emits DATA__<label>-000.jsonl ... (sharded, binary, with byte offsets) | |
• Includes files directly under that root and all subfolders. | |
- By default, DOES NOT bundle top-level repository files. | |
• To include them as a separate bundle, pass --include-repo-root. | |
Printer | |
- Per root: prints token summary vs window, file-type stats, Top-3 token-heavy files, | |
and artifact sizes (index + shards) vs --size-limit-mb. | |
- Overall: prints combined token summary and writes MASTER_INDEX.json with GLOBAL_TOC. | |
Indices | |
- Per-file metadata: tokens, chars, lines, line_endings, sha256, sha256_normalized_lf, | |
shard, row, byte_offset, byte_len, oversize flags, optional previews, optional slices. | |
- Per-bundle: skip reasons and build_config echo (ignore rules, thresholds, flags). | |
- MASTER: bundles + GLOBAL_TOC {path → index, shard, row, sha256, byte_offset, byte_len} and optional xrefs. | |
Defaults | |
- Skips <no-ext> files (use --keep-no-ext to include). | |
- Skips binaries and oversized text (MAX_SINGLE_FILE_BYTES). | |
- Strips outputs from .ipynb when embedding. | |
- Uses tiktoken if available (o200k_base → cl100k_base), else ~chars/4 heuristic. | |
- Filters build metadata (*.egg-info) and common secret-like names by default. | |
Usage | |
python package_to_snapshot_json.py | |
python package_to_snapshot_json.py --token-window 128000 --size-limit-mb 512 | |
python package_to_snapshot_json.py --keep-no-ext | |
python package_to_snapshot_json.py --include-repo-root | |
python package_to_snapshot_json.py --slice-long-files --slice-tokens 12000 --slice-max-lines 300 | |
python package_to_snapshot_json.py --max-artifacts 10 --enforce-max-artifacts | |
python package_to_snapshot_json.py --no-xrefs | |
""" | |
from __future__ import annotations | |
import argparse | |
import fnmatch | |
import hashlib | |
import io | |
import json | |
import os | |
import platform | |
import re | |
import subprocess | |
import sys | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Iterable, List, Dict, Tuple | |
from collections import defaultdict | |
# ── CONFIG ───────────────────────────────────────────────────────────────────── | |
SRC_ROOTS: List[str] = [r"."] # each becomes its own bundle | |
OUTPUT_DIR: str = r"repo-snapshot-json" # output directory at repo root | |
# Optional separate bundle for top-level repository files (off by default) | |
INCLUDE_REPO_ROOT_FILES_DEFAULT: bool = False | |
ROOT_FILE_EXT_ALLOW = [".py", ".toml", ".md", ".yml", ".yaml", ".json", ".txt", ".ipynb"] | |
# Size policy per-root | |
TARGET_SHARD_SIZE_BYTES: int = 16 * 1024 * 1024 # ~16 MB per shard | |
MAX_SHARDS: int = 9 # index + ≤9 shards per root | |
# Ignore rules (dirs + patterns) | |
IGNORE_DIRS = { | |
".git", | |
".venv", | |
"__pycache__", | |
".mypy_cache", | |
".pytest_cache", | |
".idea", | |
".vscode", | |
"node_modules", | |
"dist", | |
"build", | |
".next", | |
".cache", | |
".turbo", | |
".pnpm", | |
".vite", | |
"corpus", | |
".hrm-venv", | |
} | |
IGNORE_DIR_PATTERNS = ["*backup*", "tmp*", ".tmp_*", "*@*", "*.egg-info"] # added *.egg-info | |
# File name patterns to skip (security-minded defaults) | |
SKIP_FILE_PATTERNS = [ | |
"package_to_snapshot_*", | |
"snapshot_*", | |
"id_rsa*", | |
"*.pem", | |
"*.key", | |
"*.p12", | |
"*.crt", | |
".env", | |
".env.*", | |
"*secret*", | |
"*credential*", | |
"service_account*.json", | |
".js", | |
".css", | |
".jsonl", | |
".sh", | |
".ps1", | |
".patch", | |
] | |
# Fully skipped file types | |
SKIP_EXT_FULL = { | |
".zip", | |
".tar", | |
".gz", | |
".tgz", | |
".7z", | |
".rar", | |
".parquet", | |
".db", | |
".sqlite", | |
".so", | |
".dll", | |
".exe", | |
".bin", | |
".pdf", | |
".png", | |
".jpg", | |
".jpeg", | |
".svg", | |
".gif", | |
".webp", | |
".ico", | |
".ttf", | |
".otf", | |
".woff", | |
".woff2", | |
".mp3", | |
".mp4", | |
} | |
# Cap single embedded text file size (increase to include huge minified assets) | |
MAX_SINGLE_FILE_BYTES: int = 512 * 1024 | |
# Oversize flags (previews shown when exceeded) | |
OVERSIZE_TOKENS_THRESHOLD: int = 12000 | |
OVERSIZE_BYTES_THRESHOLD: int = 200_000 | |
PREVIEW_HEAD_CHARS: int = 300 | |
PREVIEW_TAIL_CHARS: int = 300 | |
# Notebook handling | |
IPYNB_STRIP_OUTPUTS: bool = True | |
# Tokenization & stats | |
TOKENIZER_HINTS = ("o200k_base", "cl100k_base") # try in this order | |
FALLBACK_CHARS_PER_TOKEN: int = 4 | |
TOP_N_HEAVIEST: int = 3 | |
# Skips by default | |
SKIP_NO_EXT_DEFAULT: bool = True | |
SKIP_EMPTY_DEFAULT: bool = True # skip zero-byte / whitespace-only files | |
# Optional slicing of long files | |
SLICE_LONG_FILES_DEFAULT: bool = False | |
SLICE_TOKENS_THRESHOLD_DEFAULT: int = 12000 | |
SLICE_MAX_LINES_DEFAULT: int = 300 | |
# ── Path helpers ────────────────────────────────────────────────────────────── | |
def repo_root() -> Path: | |
return Path(__file__).resolve().parent | |
def to_abs(p: str) -> Path: | |
base = repo_root() | |
pp = Path(p) | |
return pp if pp.is_absolute() else base / pp | |
def normalized_posix(path: Path, base: Path) -> str: | |
return path.relative_to(base).as_posix() | |
def sanitize_label(root_path: Path, base: Path) -> str: | |
"""Stable label for filenames: relative-to-repo path with separators → '_'.""" | |
try: | |
rel = root_path.relative_to(base).as_posix() | |
except Exception: | |
rel = root_path.as_posix() | |
if rel in (".", ""): | |
rel = "repo_root" | |
for ch in ("/", "\\", ":", "*", "?", '"', "<", ">", "|", " "): | |
rel = rel.replace(ch, "_") | |
return rel.strip("_") or "repo_root" | |
# ── Safe filesystem probes (avoid following symlinks/reparse points) ───────── | |
def safe_lstat(p: Path): | |
""" | |
Best-effort stat that does not follow symlinks/reparse points. | |
Returns os.stat_result or None when inaccessible. | |
""" | |
try: | |
return p.lstat() # do not follow | |
except OSError: | |
return None | |
# ── Git helpers (respect .gitignore when possible) ──────────────────────────── | |
def _git_is_repo(base: Path) -> bool: | |
try: | |
r = subprocess.run( | |
["git", "rev-parse", "--is-inside-work-tree"], | |
cwd=str(base), | |
stdout=subprocess.PIPE, | |
stderr=subprocess.DEVNULL, | |
text=True, | |
check=False, | |
) | |
return r.returncode == 0 and (r.stdout or "").strip() == "true" | |
except Exception: | |
return False | |
def git_list_files_not_ignored(base: Path, target: Path) -> List[Path] | None: | |
""" | |
Returns tracked + untracked (not ignored) files under `target` by consulting Git. | |
Respects .gitignore via `--exclude-standard`. Returns None if Git is unavailable. | |
Paths are absolute. | |
""" | |
if not _git_is_repo(base): | |
return None | |
try: | |
rel = target.relative_to(base).as_posix() or "." | |
except Exception: | |
return None | |
# Ask Git for tracked (-c) + untracked not ignored (-o) files under rel | |
try: | |
r = subprocess.run( | |
[ | |
"git", | |
"ls-files", | |
"-z", | |
"-c", | |
"-o", | |
"--exclude-standard", | |
"--", | |
rel, | |
], | |
cwd=str(base), | |
stdout=subprocess.PIPE, | |
stderr=subprocess.DEVNULL, | |
check=False, | |
) | |
except Exception: | |
return None | |
if r.returncode != 0: | |
return None | |
out = r.stdout or b"" | |
if not out: | |
return [] | |
parts = [p for p in out.split(b"\0") if p] | |
files: List[Path] = [] | |
for b in parts: | |
try: | |
s = b.decode("utf-8") | |
except Exception: | |
# best-effort fallback | |
try: | |
s = b.decode(sys.getfilesystemencoding() or "utf-8", errors="ignore") | |
except Exception: | |
continue | |
files.append(base / s) | |
return files | |
# ── Filters ─────────────────────────────────────────────────────────────────── | |
def matches_any(name: str, patterns: Iterable[str]) -> bool: | |
low = name.lower() | |
return any(fnmatch.fnmatch(low, pat.lower()) or (pat.lower() in low) for pat in patterns) | |
def should_skip_dir(dirname: str) -> bool: | |
low = dirname.lower() | |
if low in {d.lower() for d in IGNORE_DIRS}: | |
return True | |
return matches_any(low, IGNORE_DIR_PATTERNS) | |
def is_binary_bytes(sample: bytes) -> bool: | |
if b"\x00" in sample: | |
return True | |
textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100))) | |
return bool(sample.translate(None, textchars)) | |
def decide_skip_file(p: Path, *, skip_no_ext: bool) -> Tuple[bool, str | None]: | |
name = p.name | |
if matches_any(name, SKIP_FILE_PATTERNS): | |
return True, "name_pattern" | |
ext = p.suffix.lower() | |
if skip_no_ext and ext == "": | |
return True, "no_ext" | |
if ext in SKIP_EXT_FULL: | |
return True, "ext_filtered" | |
return False, None | |
# ── Text reading & metadata ─────────────────────────────────────────────────── | |
def read_text_safely(p: Path) -> Tuple[str, int]: | |
raw = p.read_bytes() | |
if is_binary_bytes(raw[:4096]): | |
raise ValueError("binary-like content") | |
if len(raw) > MAX_SINGLE_FILE_BYTES: | |
raise ValueError(f"too_large ({len(raw)} bytes)") | |
ext = p.suffix.lower() | |
if ext == ".ipynb" and IPYNB_STRIP_OUTPUTS: | |
try: | |
nb = json.loads(raw.decode("utf-8")) | |
for cell in nb.get("cells", []): | |
cell.pop("outputs", None) | |
cell.pop("attachments", None) | |
cell["execution_count"] = None | |
s = json.dumps(nb, ensure_ascii=False, indent=2) | |
return s, s.count("\n") + 1 | |
except Exception: | |
pass | |
try: | |
s = raw.decode("utf-8") | |
except UnicodeDecodeError: | |
raise ValueError("non_utf8") | |
return s, s.count("\n") + 1 | |
def detect_line_endings(s: str) -> str: | |
has_crlf = "\r\n" in s | |
# remove CRLF to check for lone LFs | |
lf_only = s.replace("\r\n", "") | |
has_lf = "\n" in lf_only | |
has_cr = "\r" in lf_only | |
if has_crlf and (has_lf or has_cr): | |
return "MIXED" | |
if has_crlf: | |
return "CRLF" | |
if "\n" in s: | |
return "LF" | |
return "NONE" | |
def sha256_text(s: str) -> str: | |
return hashlib.sha256(s.encode("utf-8")).hexdigest() | |
def sha256_text_normalized_lf(s: str) -> str: | |
normalized = s.replace("\r\n", "\n").replace("\r", "\n") | |
return hashlib.sha256(normalized.encode("utf-8")).hexdigest() | |
def compute_sha256_stream(fp: Path, chunk: int = 1024 * 1024) -> str: | |
h = hashlib.sha256() | |
with open(fp, "rb") as f: | |
while True: | |
b = f.read(chunk) | |
if not b: | |
break | |
h.update(b) | |
return h.hexdigest() | |
def make_token_counter(): | |
try: | |
import tiktoken # type: ignore | |
enc = None | |
for name in TOKENIZER_HINTS: | |
try: | |
enc = tiktoken.get_encoding(name) | |
break | |
except Exception: | |
continue | |
if enc is None: | |
raise RuntimeError("No suitable tiktoken encoding found") | |
return lambda s: len(enc.encode(s)) | |
except Exception: | |
return lambda s: max(1, (len(s) + FALLBACK_CHARS_PER_TOKEN - 1) // FALLBACK_CHARS_PER_TOKEN) | |
def copy_to_clipboard(text: str) -> bool: | |
try: | |
import pyperclip # type: ignore | |
pyperclip.copy(text) | |
return True | |
except Exception: | |
pass | |
try: | |
system = platform.system() | |
if system == "Windows": | |
p = subprocess.Popen(["clip"], stdin=subprocess.PIPE, close_fds=True) | |
p.stdin.write(text.encode("utf-8")) | |
p.stdin.close() | |
return p.wait() == 0 | |
elif system == "Darwin": | |
p = subprocess.Popen(["pbcopy"], stdin=subprocess.PIPE) | |
p.communicate(text.encode("utf-8")) | |
return p.returncode == 0 | |
else: | |
for cmd in (["wl-copy"], ["xclip", "-selection", "clipboard"]): | |
try: | |
p = subprocess.Popen(cmd, stdin=subprocess.PIPE) | |
p.communicate(text.encode("utf-8")) | |
if p.returncode == 0: | |
return True | |
except Exception: | |
continue | |
except Exception: | |
pass | |
return False | |
# ── Stats printer ───────────────────────────────────────────────────────────── | |
def print_filetype_stats(entries: List[Dict], top_n: int = TOP_N_HEAVIEST) -> None: | |
if not entries: | |
print("[Stats] No files embedded; nothing to report.") | |
return | |
by_ext: Dict[str, Dict[str, int]] = {} | |
for e in entries: | |
ext = e.get("lang") or "" | |
if ext == "": | |
continue | |
agg = by_ext.setdefault(ext, {"count": 0, "tokens": 0}) | |
agg["count"] += 1 | |
agg["tokens"] += int(e.get("tokens", 0)) | |
sorted_exts = sorted(by_ext.items(), key=lambda kv: (-kv[1]["tokens"], kv[0])) | |
print("File types (count | tokens):") | |
for ext, s in sorted_exts: | |
print(f" {ext}: {s['count']:,} | {s['tokens']:,}") | |
heavy = sorted(entries, key=lambda e: int(e.get("tokens", 0)), reverse=True)[:top_n] | |
print(f"Top {top_n} token-heavy files:") | |
for e in heavy: | |
print(f" • {e['path']} — {int(e['tokens']):,} tokens") | |
def _folder_key_from_path(full_path: str, *, depth: int | None, label: str) -> str: | |
""" | |
full_path is like 'repo_root/path/to/file.py' or 'src/hydraedge/...'. | |
We strip the leading '<label>/' and then optionally collapse to first N components. | |
""" | |
# strip bundle label | |
prefix = f"{label}/" | |
if full_path.startswith(prefix): | |
rel = full_path[len(prefix) :] | |
else: | |
rel = full_path.split("/", 1)[1] if "/" in full_path else "" | |
if rel == "": | |
# a file that lives directly at the label root -> treat as '.' | |
return "." | |
parts = rel.split("/") | |
# last element is file name; we want the directory only | |
if len(parts) == 1: | |
# file directly under label root | |
return "." | |
dirs = parts[:-1] | |
if depth is not None and depth > 0: | |
dirs = dirs[: min(depth, len(dirs))] | |
return "/".join(dirs) if dirs else "." | |
def print_top_folders(entries: list[dict], label: str, *, top_k: int, depth: int | None) -> None: | |
""" | |
Aggregate per-folder stats from entry rows (embedded files only). | |
We use: file count, sum(tokens), sum(size_bytes). | |
""" | |
if not entries: | |
return | |
agg = defaultdict(lambda: {"files": 0, "tokens": 0, "bytes": 0}) | |
for e in entries: | |
path = e.get("path") or "" | |
key = _folder_key_from_path(path, depth=depth, label=label) | |
agg[key]["files"] += 1 | |
agg[key]["tokens"] += int(e.get("tokens", 0)) | |
agg[key]["bytes"] += int(e.get("size_bytes", 0) or 0) | |
# sort primarily by files desc, then tokens desc, then key asc | |
items = sorted(agg.items(), key=lambda kv: (-kv[1]["files"], -kv[1]["tokens"], kv[0])) | |
print(f"Top folders (by files) [depth={'None' if depth is None else depth}, top={top_k}]") | |
for i, (k, v) in enumerate(items[: max(0, top_k)], start=1): | |
mb = v["bytes"] / (1024 * 1024) if v["bytes"] else 0.0 | |
print(f" {i:>2}) {k:<45} files={v['files']:>4} tokens={v['tokens']:>8,} bytes={mb:>6.1f} MB") | |
# ── Collectors ──────────────────────────────────────────────────────────────── | |
def collect_entries_for_root( | |
root_path: Path, | |
label: str, | |
*, | |
skip_no_ext: bool, | |
skip_empty: bool, | |
slice_long: bool, | |
slice_tokens: int, | |
slice_max_lines: int, | |
tok_counter, | |
) -> Tuple[List[Dict], List[Dict]]: | |
""" | |
Returns (entries, skipped). 'entries' may include slices for long files (if enabled). | |
Each entry has keys: path, lang, size_bytes, chars, lines, line_endings, sha256, | |
sha256_normalized_lf, tokens, content, (optional) slice{}, parent | |
""" | |
entries: List[Dict] = [] | |
skipped: List[Dict] = [] | |
# Discover files | |
if root_path.is_file(): | |
fps = [root_path] | |
elif root_path.is_dir(): | |
fps = [] | |
base = repo_root() | |
git_files = git_list_files_not_ignored(base, root_path) | |
if git_files is not None: | |
# Use Git’s view (tracked + untracked not ignored). Filter with decide_skip_file below. | |
fps = [p for p in git_files if p.is_file()] | |
else: | |
# Fallback: walk filesystem and apply static ignore dirs/patterns. | |
# Ignore directory read errors (e.g., permission/reparse anomalies) | |
for cur, dirnames, files in os.walk(root_path, topdown=True, onerror=lambda e: None): | |
dirnames[:] = [d for d in dirnames if not should_skip_dir(d)] | |
for fname in files: | |
fp = Path(cur) / fname | |
fps.append(fp) | |
else: | |
return entries, skipped # missing | |
# Process files | |
for fp in sorted(fps, key=lambda p: p.as_posix().lower()): | |
# Apply file-level skip rules early (name/ext-based) | |
skip, reason = decide_skip_file(fp, skip_no_ext=skip_no_ext) | |
if skip: | |
st = safe_lstat(fp) | |
try: | |
try_rel = fp.relative_to(root_path).as_posix() | |
except Exception: | |
try_rel = fp.name | |
skipped.append( | |
{ | |
"path": f"{label}/{try_rel}", | |
"reason": reason, | |
"size_bytes_fs": int(st.st_size) if st else None, | |
"ext": fp.suffix.lower(), | |
} | |
) | |
continue | |
try: | |
try_rel = fp.relative_to(root_path).as_posix() | |
except Exception: | |
try_rel = fp.name | |
st = safe_lstat(fp) | |
if st is None: | |
skipped.append( | |
{ | |
"path": f"{label}/{try_rel}", | |
"reason": "inaccessible", | |
"size_bytes_fs": None, | |
"ext": fp.suffix.lower(), | |
} | |
) | |
continue | |
if skip_empty and st.st_size == 0: | |
skipped.append( | |
{ | |
"path": f"{label}/{try_rel}", | |
"reason": "empty", | |
"size_bytes_fs": 0, | |
"ext": fp.suffix.lower(), | |
} | |
) | |
continue | |
if st.st_size > MAX_SINGLE_FILE_BYTES: | |
# Too large to embed; record hash & size | |
skipped.append( | |
{ | |
"path": f"{label}/{try_rel}", | |
"reason": "too_large", | |
"size_bytes_fs": int(st.st_size), | |
"sha256_full": compute_sha256_stream(fp), | |
"ext": fp.suffix.lower(), | |
} | |
) | |
continue | |
# Try to read as text | |
try: | |
content, nlines = read_text_safely(fp) | |
except Exception as e: | |
reason = str(e) | |
if "binary-like" in reason: | |
reason = "binary_like" | |
elif "non_utf8" in reason or "non-UTF8" in reason: | |
reason = "non_utf8" | |
skipped.append( | |
{ | |
"path": f"{label}/{try_rel}", | |
"reason": reason, | |
"size_bytes_fs": int(st.st_size), | |
"ext": fp.suffix.lower(), | |
} | |
) | |
continue | |
# Metadata | |
ext = fp.suffix.lstrip(".").lower() | |
chars = len(content) | |
n_tokens = tok_counter(content) | |
le = detect_line_endings(content) | |
norm_sha = sha256_text_normalized_lf(content) | |
sha = sha256_text(content) | |
oversize = (n_tokens >= OVERSIZE_TOKENS_THRESHOLD) or (st.st_size >= OVERSIZE_BYTES_THRESHOLD) | |
def make_entry( | |
slice_info: Dict | None, | |
text: str, | |
lines_count: int, | |
start_line: int | None, | |
end_line: int | None, | |
): | |
path = f"{label}/{try_rel}" | |
if slice_info: | |
path = f"{path}#L{start_line}-{end_line}" | |
e = { | |
"path": path, | |
"parent": f"{label}/{try_rel}" if slice_info else None, | |
"lang": ext, | |
"size_bytes": len(text.encode("utf-8")), | |
"chars": len(text), | |
"lines": lines_count, | |
"line_endings": detect_line_endings(text), | |
"sha256": sha256_text(text), | |
"sha256_normalized_lf": sha256_text_normalized_lf(text), | |
"tokens": tok_counter(text), | |
"content": text, | |
} | |
if slice_info: | |
e["slice"] = slice_info | |
return e | |
# Optional slicing | |
if slice_long and n_tokens >= slice_tokens and nlines > slice_max_lines: | |
# Slice by lines for simplicity | |
lines = content.splitlines(keepends=True) | |
start = 0 | |
while start < len(lines): | |
end = min(start + slice_max_lines, len(lines)) | |
seg = "".join(lines[start:end]) | |
slice_info = {"start_line": start + 1, "end_line": end, "parent": f"{label}/{try_rel}"} | |
entries.append(make_entry(slice_info, seg, end - start, start + 1, end)) | |
start = end | |
else: | |
ent = { | |
"path": f"{label}/{try_rel}", | |
"lang": ext, | |
"size_bytes": len(content.encode("utf-8")), | |
"chars": chars, | |
"lines": nlines, | |
"line_endings": le, | |
"sha256": sha, | |
"sha256_normalized_lf": norm_sha, | |
"tokens": n_tokens, | |
"content": content, | |
} | |
# Previews for oversize (stored in index rows later) | |
ent["_oversize_flag"] = oversize | |
if oversize: | |
head = content[:PREVIEW_HEAD_CHARS] | |
tail = content[-PREVIEW_TAIL_CHARS:] if len(content) > PREVIEW_TAIL_CHARS else "" | |
ent["_preview_head"] = head | |
ent["_preview_tail"] = tail | |
entries.append(ent) | |
return entries, skipped | |
def collect_entries_for_repo_root( | |
*, | |
skip_no_ext: bool, | |
skip_empty: bool, | |
slice_long: bool, | |
slice_tokens: int, | |
slice_max_lines: int, | |
tok_counter, | |
): | |
"""Optional: top-level repository files (only if --include-repo-root is passed).""" | |
base = repo_root() | |
fps = [] | |
for fp in sorted(base.iterdir(), key=lambda p: p.name.lower()): | |
# Guard against inaccessible items at repo root (e.g., reparse points) | |
try: | |
st = safe_lstat(fp) | |
except Exception: | |
st = None | |
if st is None: | |
continue | |
try: | |
is_file = fp.is_file() | |
except OSError: | |
is_file = False | |
if not is_file: | |
continue | |
skip, reason = decide_skip_file(fp, skip_no_ext=skip_no_ext) | |
if skip: | |
continue | |
if fp.suffix.lower() not in ROOT_FILE_EXT_ALLOW: | |
continue | |
fps.append(fp) | |
# Reuse the per-root collector with label 'repo_root' | |
return collect_entries_for_root( | |
base, | |
"repo_root", | |
skip_no_ext=skip_no_ext, | |
skip_empty=skip_empty, | |
slice_long=slice_long, | |
slice_tokens=slice_tokens, | |
slice_max_lines=slice_max_lines, | |
tok_counter=tok_counter, | |
) | |
# ── XREFS (naive crosswalk: sources → tests) ────────────────────────────────── | |
IMPORT_RE = re.compile( | |
r"^\s*(?:from\s+([a-zA-Z0-9_\.]+)\s+import|import\s+([a-zA-Z0-9_\.]+))", re.M | |
) | |
def build_xrefs(all_entries_by_label: Dict[str, List[Dict]]) -> Dict[str, List[str]]: | |
""" | |
Returns {source_path: [test_paths...]}. | |
Heuristic: | |
- A "test" file is any *.py under labels whose name startswith 'test' or == 'tests'. | |
- A "source" file is any *.py elsewhere. | |
- Match if test content includes module import that looks like source dotted path OR | |
mentions the source basename (w/o .py). | |
""" | |
tests: List[Dict] = [] | |
sources: List[Dict] = [] | |
for label, ents in all_entries_by_label.items(): | |
is_test_label = label.lower().startswith("test") | |
for e in ents: | |
if (e.get("lang") or "") != "py": | |
continue | |
if is_test_label: | |
tests.append(e) | |
else: | |
sources.append(e) | |
# Pre-extract imports from tests | |
test_imports: Dict[str, set] = {} | |
for t in tests: | |
content = t.get("content") or "" | |
mods = set() | |
for m1, m2 in IMPORT_RE.findall(content): | |
if m1: | |
mods.add(m1) | |
if m2: | |
mods.add(m2) | |
test_imports[t["path"]] = mods | |
xrefs: Dict[str, List[str]] = {} | |
for s in sources: | |
spath = s["path"] # e.g., "src/hydraedge/extractor/core/config.py" | |
# candidates | |
rel = spath.split("/", 1)[1] if "/" in spath else spath # drop label | |
dotted_full = rel.replace("/", ".").removesuffix(".py") | |
dotted_no_src = dotted_full | |
if dotted_no_src.startswith("src."): | |
dotted_no_src = dotted_no_src[4:] | |
base = Path(rel).stem | |
hits: List[str] = [] | |
for t in tests: | |
tmods = test_imports.get(t["path"], set()) | |
if dotted_full in tmods or dotted_no_src in tmods: | |
hits.append(t["path"]) | |
continue | |
# fallback: substring occurrences of basename (coarse) | |
if re.search(rf"\b{re.escape(base)}\b", t.get("content") or ""): | |
hits.append(t["path"]) | |
if hits: | |
xrefs[spath] = sorted(set(hits)) | |
return xrefs | |
# ── Sharding + per-root index ───────────────────────────────────────────────── | |
def write_bundle( | |
label: str, | |
entries: List[Dict], | |
skipped: List[Dict], | |
out_dir: Path, | |
token_window: int, | |
size_limit_mb: float, | |
build_config: Dict, | |
*, | |
top_folders: int, | |
folder_depth: int | None, | |
) -> Dict: | |
""" | |
Writes shards (binary JSONL) + per-root index (with byte offsets). | |
Returns bundle manifest with index_rows for GLOBAL_TOC. | |
""" | |
entries = sorted(entries, key=lambda e: e["path"]) | |
# Summary line | |
total_tokens = sum(int(e.get("tokens", 0)) for e in entries) | |
pct = 100.0 * total_tokens / max(1, token_window) | |
print(f"≈ {total_tokens:,} tokens of {token_window:,} ({pct:,.2f}% )") | |
print_filetype_stats(entries) | |
# Folder histogram | |
if top_folders and top_folders > 0: | |
print_top_folders(entries, label, top_k=top_folders, depth=folder_depth) | |
# Shard writing (binary, track byte offsets) | |
shard_names: List[str] = [] | |
shard_acc = 0 | |
shard_idx = -1 | |
rows_in_shard = 0 | |
index_rows: List[Dict] = [] | |
def shard_file(i: int) -> str: | |
return f"DATA__{label}-{i:03d}.jsonl" | |
writer: io.BufferedWriter | None = None | |
try: | |
for ent in entries: | |
payload = { | |
"path": ent["path"], | |
"lang": ent["lang"], | |
"sha256": ent["sha256"], | |
"lines": ent["lines"], | |
"content": ent["content"], | |
} | |
# Optional: keep slice meta in payload for slices | |
if ent.get("slice"): | |
payload["slice"] = ent["slice"] | |
payload["parent"] = ent.get("parent") | |
payload_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8") | |
bsz = len(payload_bytes) + 1 # newline | |
if writer is None or shard_acc + bsz > TARGET_SHARD_SIZE_BYTES or rows_in_shard > 10000: | |
if writer is not None: | |
writer.close() | |
shard_idx += 1 | |
if shard_idx >= MAX_SHARDS: | |
raise RuntimeError( | |
f"[{label}] Shard limit exceeded ({MAX_SHARDS}). " | |
f"Increase MAX_SHARDS or TARGET_SHARD_SIZE_BYTES." | |
) | |
sname = shard_file(shard_idx) | |
shard_names.append(sname) | |
writer = open(out_dir / sname, "wb") | |
shard_acc = 0 | |
rows_in_shard = 0 | |
byte_offset = writer.tell() | |
writer.write(payload_bytes + b"\n") | |
byte_len = bsz | |
# Index row | |
row = { | |
"path": ent["path"], | |
"parent": ent.get("parent"), | |
"lang": ent["lang"], | |
"size_bytes": ent["size_bytes"], | |
"chars": ent.get("chars"), | |
"lines": ent["lines"], | |
"line_endings": ent.get("line_endings"), | |
"sha256": ent["sha256"], | |
"sha256_normalized_lf": ent.get("sha256_normalized_lf"), | |
"tokens": ent.get("tokens", 0), | |
"shard": shard_idx, | |
"row": rows_in_shard, | |
"byte_offset": int(byte_offset), | |
"byte_len": int(byte_len), | |
"oversize_tokens": bool(ent.get("_oversize_flag", False)), | |
"oversize_bytes": bool(ent.get("size_bytes", 0) >= OVERSIZE_BYTES_THRESHOLD), | |
} | |
# Previews (if any) | |
if ent.get("_preview_head") is not None: | |
row["preview_head"] = ent["_preview_head"] | |
if ent.get("_preview_tail") is not None: | |
row["preview_tail"] = ent["_preview_tail"] | |
# Slice meta (if any) | |
if ent.get("slice"): | |
row["slice"] = ent["slice"] | |
index_rows.append(row) | |
shard_acc += bsz | |
rows_in_shard += 1 | |
finally: | |
if writer is not None: | |
writer.close() | |
# Per-root index | |
index_name = f"INDEX__{label}.json" | |
per_index = { | |
"schema_version": "1.1", | |
"bundle": { | |
"label": label, | |
"generated_utc": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), | |
"target_shard_size_bytes": TARGET_SHARD_SIZE_BYTES, | |
"max_shards": MAX_SHARDS, | |
"shards": shard_names, | |
}, | |
"build_config": build_config, | |
"agent_howto": { | |
"steps": [ | |
"Load this per-root index (JSON).", | |
"Locate the target by exact `path` in `files[]`.", | |
"Use `byte_offset` and `byte_len` to seek into `bundle.shards[entry.shard]` and read only that JSON line.", | |
"If offsets are missing, fall back to scanning for matching `path` AND `sha256`.", | |
"Parse the JSON line; use `content` as canonical source.", | |
"Optionally verify `sha256`.", | |
], | |
"python": ( | |
"def load_file(index_path, target_path):\n" | |
" import json\n" | |
" from pathlib import Path\n" | |
" idx = json.loads(Path(index_path).read_text('utf-8'))\n" | |
" table = {e['path']: e for e in idx['files']}\n" | |
" ent = table[target_path]\n" | |
" shard_name = idx['bundle']['shards'][ent['shard']]\n" | |
" shard_path = Path(index_path).parent / shard_name\n" | |
" with open(shard_path, 'rb') as f:\n" | |
" f.seek(ent['byte_offset'])\n" | |
" buf = f.read(ent['byte_len'])\n" | |
" obj = json.loads(buf.decode('utf-8'))\n" | |
" assert obj.get('sha256') == ent['sha256']\n" | |
" return obj['content']\n" | |
), | |
}, | |
"files": index_rows, | |
"skipped": skipped, | |
} | |
(out_dir / index_name).write_text( | |
json.dumps(per_index, ensure_ascii=False, indent=2), encoding="utf-8" | |
) | |
# Artifact sizes vs limit | |
limit_bytes = int(size_limit_mb * 1024 * 1024) | |
def human_mb(n: int) -> str: | |
return f"{(n / (1024 * 1024)):.2f} MB" | |
items = [(index_name, (out_dir / index_name).stat().st_size)] | |
for s in shard_names: | |
sp = out_dir / s | |
if sp.exists(): | |
items.append((s, sp.stat().st_size)) | |
print("Upload artifacts (size | limit):") | |
for name, size in items: | |
status = "OK" if size <= limit_bytes else "EXCEEDS" | |
print(f" • {name} — {human_mb(size)} of {size_limit_mb:.2f} MB ({status})") | |
return { | |
"label": label, | |
"index": index_name, | |
"shards": shard_names, | |
"files_count": len(index_rows), | |
"tokens": total_tokens, | |
"artifact_sizes": dict(items), | |
"index_rows": index_rows, | |
} | |
# ── MASTER index (with GLOBAL_TOC and optional XREFS) ───────────────────────── | |
def write_master_index( | |
out_dir: Path, | |
bundles: List[Dict], | |
overall_tokens: int, | |
token_window: int, | |
copy_summary: bool, | |
include_xrefs: bool, | |
xrefs: Dict[str, List[str]] | None, | |
): | |
master = { | |
"schema_version": "1.1", | |
"generated_utc": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), | |
"bundles": [ | |
{ | |
"label": b["label"], | |
"index": b["index"], | |
"shards": b["shards"], | |
"files_count": b["files_count"], | |
"tokens": b["tokens"], | |
} | |
for b in bundles | |
], | |
"agent_howto": { | |
"steps": [ | |
"Load MASTER_INDEX.json.", | |
"Resolve path via GLOBAL_TOC to get index, shard, byte offsets.", | |
"Open the per-root index if you need richer metadata (previews, slices, etc.).", | |
"If GLOBAL_TOC is missing an entry, open the appropriate INDEX__*.json and locate it there.", | |
] | |
}, | |
"GLOBAL_TOC": {}, | |
} | |
# Build GLOBAL_TOC | |
for b in bundles: | |
idx = b["index"] | |
for row in b.get("index_rows", []): | |
master["GLOBAL_TOC"][row["path"]] = { | |
"label": b["label"], | |
"index": idx, | |
"shard": row["shard"], | |
"row": row["row"], | |
"sha256": row["sha256"], | |
"byte_offset": row["byte_offset"], | |
"byte_len": row["byte_len"], | |
} | |
if include_xrefs and xrefs: | |
master["xrefs"] = xrefs | |
master_name = "MASTER_INDEX.json" | |
(out_dir / master_name).write_text( | |
json.dumps(master, ensure_ascii=False, indent=2), encoding="utf-8" | |
) | |
pct = 100.0 * overall_tokens / max(1, token_window) | |
summary = f"≈ {overall_tokens:,} tokens of {token_window:,} ({pct:,.2f}% )" | |
msg = summary + " | " | |
msg += ( | |
"Copied to clipboard." if (copy_summary and copy_to_clipboard(summary)) else "Copy skipped." | |
) | |
print("\nOVERALL:", msg) | |
print(f"[Done] {len(bundles)} bundle(s) + {master_name} in {out_dir.resolve()}") | |
# ── Main ────────────────────────────────────────────────────────────────────── | |
def main() -> None: | |
p = argparse.ArgumentParser( | |
description="Per-root repo snapshot → JSON indices + JSONL shards with stats and O(1) seeks." | |
) | |
p.add_argument( | |
"--keep-no-ext", | |
action="store_true", | |
help="Include files without an extension (e.g., LICENSE, Makefile). Default: skip.", | |
) | |
p.add_argument("--keep-empty", action="store_true", help="Include empty files (default: skip).") | |
p.add_argument( | |
"--token-window", | |
type=int, | |
default=128_000, | |
help="Usable chat token window for summary lines (default: 128000).", | |
) | |
p.add_argument( | |
"--size-limit-mb", | |
type=float, | |
default=512.0, | |
help="Per-file upload size limit for the size checker (default: 512 MB).", | |
) | |
p.add_argument( | |
"--copy-summary", | |
action=argparse.BooleanOptionalAction, | |
default=True, | |
help="Copy the OVERALL summary line to clipboard (default: on).", | |
) | |
p.add_argument( | |
"--include-repo-root", | |
action="store_true", | |
help="Also create a separate bundle for top-level repository files (default: off).", | |
) | |
# Slicing options | |
p.add_argument( | |
"--slice-long-files", | |
action="store_true", | |
help="Slice long files into line chunks for retrieval (default: off).", | |
) | |
p.add_argument( | |
"--slice-tokens", | |
type=int, | |
default=SLICE_TOKENS_THRESHOLD_DEFAULT, | |
help="Token threshold to trigger slicing (default: 12000).", | |
) | |
p.add_argument( | |
"--slice-max-lines", | |
type=int, | |
default=SLICE_MAX_LINES_DEFAULT, | |
help="Max lines per slice (default: 300).", | |
) | |
# XREFS | |
p.add_argument( | |
"--no-xrefs", action="store_true", help="Disable building source→test crosswalk (xrefs)." | |
) | |
# Artifact count cap | |
p.add_argument( | |
"--max-artifacts", | |
type=int, | |
default=None, | |
help="Warn if total number of output files exceeds this cap.", | |
) | |
p.add_argument( | |
"--enforce-max-artifacts", | |
action="store_true", | |
help="Exit with non-zero status if --max-artifacts is exceeded.", | |
) | |
p.add_argument( | |
"--top-folders", | |
type=int, | |
default=10, | |
help="Print the Top-K folders by embedded file count per bundle (default: 10).", | |
) | |
p.add_argument( | |
"--folder-depth", | |
type=int, | |
default=None, | |
help="Collapse folder paths to first N components for stats (default: None = full path).", | |
) | |
args = p.parse_args() | |
base = repo_root() | |
out_dir = to_abs(OUTPUT_DIR) | |
out_dir.mkdir(parents=True, exist_ok=True) | |
skip_no_ext = not args.keep_no_ext | |
skip_empty = not args.keep_empty | |
# Tokenizer once | |
tok = make_token_counter() | |
# Phase 1: collect entries + skips for each root (in memory) | |
collected: Dict[str, Dict] = {} | |
for root in SRC_ROOTS: | |
rp = to_abs(root) | |
if not rp.exists(): | |
continue | |
label = sanitize_label(rp, base) | |
print(f"\n== Scanning: {label} ==") | |
entries, skipped = collect_entries_for_root( | |
rp, | |
label, | |
skip_no_ext=skip_no_ext, | |
skip_empty=skip_empty, | |
slice_long=args.slice_long_files, | |
slice_tokens=args.slice_tokens, | |
slice_max_lines=args.slice_max_lines, | |
tok_counter=tok, | |
) | |
if not entries and not skipped: | |
print("[Info] No embeddable text files found (after filters).") | |
continue | |
collected[label] = {"entries": entries, "skipped": skipped} | |
# Optional: repo_root bundle | |
if args.include_repo_root: | |
entries, skipped = collect_entries_for_repo_root( | |
skip_no_ext=skip_no_ext, | |
skip_empty=skip_empty, | |
slice_long=args.slice_long_files, | |
slice_tokens=args.slice_tokens, | |
slice_max_lines=args.slice_max_lines, | |
tok_counter=tok, | |
) | |
if entries or skipped: | |
collected["repo_root"] = {"entries": entries, "skipped": skipped} | |
# Phase 2: xrefs (optional) | |
xrefs = None | |
if not args.no_xrefs: | |
all_entries_by_label = {lbl: v["entries"] for lbl, v in collected.items()} | |
xrefs = build_xrefs(all_entries_by_label) | |
# Phase 3: write bundles | |
bundles: List[Dict] = [] | |
overall_tokens = 0 | |
for label, data in collected.items(): | |
print(f"\n== Bundle: {label} ==") | |
build_config = { | |
"ignore_rules": { | |
"dirs": sorted(IGNORE_DIRS), | |
"dir_patterns": IGNORE_DIR_PATTERNS, | |
"skip_file_patterns": SKIP_FILE_PATTERNS, | |
"skip_ext_full": sorted(SKIP_EXT_FULL), | |
}, | |
"limits": { | |
"max_single_file_bytes": MAX_SINGLE_FILE_BYTES, | |
"target_shard_size_bytes": TARGET_SHARD_SIZE_BYTES, | |
"oversize_tokens_threshold": OVERSIZE_TOKENS_THRESHOLD, | |
"oversize_bytes_threshold": OVERSIZE_BYTES_THRESHOLD, | |
}, | |
"flags": { | |
"skip_no_ext": skip_no_ext, | |
"skip_empty": skip_empty, | |
"slice_long_files": args.slice_long_files, | |
"slice_tokens": args.slice_tokens, | |
"slice_max_lines": args.slice_max_lines, | |
}, | |
} | |
b = write_bundle( | |
label, | |
data["entries"], | |
data["skipped"], | |
out_dir, | |
args.token_window, | |
args.size_limit_mb, | |
build_config, | |
top_folders=args.top_folders, | |
folder_depth=args.folder_depth, | |
) | |
bundles.append(b) | |
overall_tokens += b["tokens"] | |
# Phase 4: MASTER index with GLOBAL_TOC (+xrefs) and overall summary | |
write_master_index( | |
out_dir, | |
bundles, | |
overall_tokens, | |
args.token_window, | |
args.copy_summary, | |
include_xrefs=(not args.no_xrefs), | |
xrefs=xrefs, | |
) | |
# Phase 5: artifacts cap check | |
if args.max_artifacts is not None: | |
total_artifacts = 1 # MASTER_INDEX.json | |
for b in bundles: | |
total_artifacts += 1 # per-root INDEX__*.json | |
total_artifacts += len(b["shards"]) | |
if total_artifacts > args.max_artifacts: | |
msg = ( | |
f"WARNING: produced {total_artifacts} artifacts " | |
f"(cap {args.max_artifacts}). Consider increasing shard size, " | |
f"disabling repo_root, or consolidating roots." | |
) | |
print(msg) | |
if args.enforce_max_artifacts: | |
sys.exit(2) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment