Skip to content

Instantly share code, notes, and snippets.

@pakkinlau
Last active September 28, 2025 22:20
Show Gist options
  • Save pakkinlau/342eede3f7f969b0f2d6b33ce6aa503a to your computer and use it in GitHub Desktop.
Save pakkinlau/342eede3f7f969b0f2d6b33ce6aa503a to your computer and use it in GitHub Desktop.
package_to_snapshot_json.py — Per-root JSON bundles (+ MASTER index) with O(1) shard seeks, GLOBAL TOC, stats, skip reporting, oversize previews, optional slicing, and xrefs. Behavior - For each configured root in SRC_ROOTS (e.g., 'src', 'tests'): • Emits INDEX__<label>.json • Emits DATA__<label>-000.jsonl ... (sharded, binary, with byte offsets…
#!/usr/bin/env python3
"""
(Note: this script is just for creating JSON snapshot of the repo. It is not in the part of the repo itself.
Any agent --- you can skip this script and also, don't edit the script!)
package_to_snapshot_json.py — Per-root JSON bundles (+ MASTER index) with O(1) shard seeks,
GLOBAL TOC, stats, skip reporting, oversize previews, optional slicing, and xrefs.
Behavior
- For each configured root in SRC_ROOTS (e.g., 'src', 'tests'):
• Emits INDEX__<label>.json
• Emits DATA__<label>-000.jsonl ... (sharded, binary, with byte offsets)
• Includes files directly under that root and all subfolders.
- By default, DOES NOT bundle top-level repository files.
• To include them as a separate bundle, pass --include-repo-root.
Printer
- Per root: prints token summary vs window, file-type stats, Top-3 token-heavy files,
and artifact sizes (index + shards) vs --size-limit-mb.
- Overall: prints combined token summary and writes MASTER_INDEX.json with GLOBAL_TOC.
Indices
- Per-file metadata: tokens, chars, lines, line_endings, sha256, sha256_normalized_lf,
shard, row, byte_offset, byte_len, oversize flags, optional previews, optional slices.
- Per-bundle: skip reasons and build_config echo (ignore rules, thresholds, flags).
- MASTER: bundles + GLOBAL_TOC {path → index, shard, row, sha256, byte_offset, byte_len} and optional xrefs.
Defaults
- Skips <no-ext> files (use --keep-no-ext to include).
- Skips binaries and oversized text (MAX_SINGLE_FILE_BYTES).
- Strips outputs from .ipynb when embedding.
- Uses tiktoken if available (o200k_base → cl100k_base), else ~chars/4 heuristic.
- Filters build metadata (*.egg-info) and common secret-like names by default.
Usage
python package_to_snapshot_json.py
python package_to_snapshot_json.py --token-window 128000 --size-limit-mb 512
python package_to_snapshot_json.py --keep-no-ext
python package_to_snapshot_json.py --include-repo-root
python package_to_snapshot_json.py --slice-long-files --slice-tokens 12000 --slice-max-lines 300
python package_to_snapshot_json.py --max-artifacts 10 --enforce-max-artifacts
python package_to_snapshot_json.py --no-xrefs
"""
from __future__ import annotations
import argparse
import fnmatch
import hashlib
import io
import json
import os
import platform
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Iterable, List, Dict, Tuple
from collections import defaultdict
# ── CONFIG ─────────────────────────────────────────────────────────────────────
SRC_ROOTS: List[str] = [r"."] # each becomes its own bundle
OUTPUT_DIR: str = r"repo-snapshot-json" # output directory at repo root
# Optional separate bundle for top-level repository files (off by default)
INCLUDE_REPO_ROOT_FILES_DEFAULT: bool = False
ROOT_FILE_EXT_ALLOW = [".py", ".toml", ".md", ".yml", ".yaml", ".json", ".txt", ".ipynb"]
# Size policy per-root
TARGET_SHARD_SIZE_BYTES: int = 16 * 1024 * 1024 # ~16 MB per shard
MAX_SHARDS: int = 9 # index + ≤9 shards per root
# Ignore rules (dirs + patterns)
IGNORE_DIRS = {
".git",
".venv",
"__pycache__",
".mypy_cache",
".pytest_cache",
".idea",
".vscode",
"node_modules",
"dist",
"build",
".next",
".cache",
".turbo",
".pnpm",
".vite",
"corpus",
".hrm-venv",
}
IGNORE_DIR_PATTERNS = ["*backup*", "tmp*", ".tmp_*", "*@*", "*.egg-info"] # added *.egg-info
# File name patterns to skip (security-minded defaults)
SKIP_FILE_PATTERNS = [
"package_to_snapshot_*",
"snapshot_*",
"id_rsa*",
"*.pem",
"*.key",
"*.p12",
"*.crt",
".env",
".env.*",
"*secret*",
"*credential*",
"service_account*.json",
".js",
".css",
".jsonl",
".sh",
".ps1",
".patch",
]
# Fully skipped file types
SKIP_EXT_FULL = {
".zip",
".tar",
".gz",
".tgz",
".7z",
".rar",
".parquet",
".db",
".sqlite",
".so",
".dll",
".exe",
".bin",
".pdf",
".png",
".jpg",
".jpeg",
".svg",
".gif",
".webp",
".ico",
".ttf",
".otf",
".woff",
".woff2",
".mp3",
".mp4",
}
# Cap single embedded text file size (increase to include huge minified assets)
MAX_SINGLE_FILE_BYTES: int = 512 * 1024
# Oversize flags (previews shown when exceeded)
OVERSIZE_TOKENS_THRESHOLD: int = 12000
OVERSIZE_BYTES_THRESHOLD: int = 200_000
PREVIEW_HEAD_CHARS: int = 300
PREVIEW_TAIL_CHARS: int = 300
# Notebook handling
IPYNB_STRIP_OUTPUTS: bool = True
# Tokenization & stats
TOKENIZER_HINTS = ("o200k_base", "cl100k_base") # try in this order
FALLBACK_CHARS_PER_TOKEN: int = 4
TOP_N_HEAVIEST: int = 3
# Skips by default
SKIP_NO_EXT_DEFAULT: bool = True
SKIP_EMPTY_DEFAULT: bool = True # skip zero-byte / whitespace-only files
# Optional slicing of long files
SLICE_LONG_FILES_DEFAULT: bool = False
SLICE_TOKENS_THRESHOLD_DEFAULT: int = 12000
SLICE_MAX_LINES_DEFAULT: int = 300
# ── Path helpers ──────────────────────────────────────────────────────────────
def repo_root() -> Path:
return Path(__file__).resolve().parent
def to_abs(p: str) -> Path:
base = repo_root()
pp = Path(p)
return pp if pp.is_absolute() else base / pp
def normalized_posix(path: Path, base: Path) -> str:
return path.relative_to(base).as_posix()
def sanitize_label(root_path: Path, base: Path) -> str:
"""Stable label for filenames: relative-to-repo path with separators → '_'."""
try:
rel = root_path.relative_to(base).as_posix()
except Exception:
rel = root_path.as_posix()
if rel in (".", ""):
rel = "repo_root"
for ch in ("/", "\\", ":", "*", "?", '"', "<", ">", "|", " "):
rel = rel.replace(ch, "_")
return rel.strip("_") or "repo_root"
# ── Safe filesystem probes (avoid following symlinks/reparse points) ─────────
def safe_lstat(p: Path):
"""
Best-effort stat that does not follow symlinks/reparse points.
Returns os.stat_result or None when inaccessible.
"""
try:
return p.lstat() # do not follow
except OSError:
return None
# ── Git helpers (respect .gitignore when possible) ────────────────────────────
def _git_is_repo(base: Path) -> bool:
try:
r = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=str(base),
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
check=False,
)
return r.returncode == 0 and (r.stdout or "").strip() == "true"
except Exception:
return False
def git_list_files_not_ignored(base: Path, target: Path) -> List[Path] | None:
"""
Returns tracked + untracked (not ignored) files under `target` by consulting Git.
Respects .gitignore via `--exclude-standard`. Returns None if Git is unavailable.
Paths are absolute.
"""
if not _git_is_repo(base):
return None
try:
rel = target.relative_to(base).as_posix() or "."
except Exception:
return None
# Ask Git for tracked (-c) + untracked not ignored (-o) files under rel
try:
r = subprocess.run(
[
"git",
"ls-files",
"-z",
"-c",
"-o",
"--exclude-standard",
"--",
rel,
],
cwd=str(base),
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
check=False,
)
except Exception:
return None
if r.returncode != 0:
return None
out = r.stdout or b""
if not out:
return []
parts = [p for p in out.split(b"\0") if p]
files: List[Path] = []
for b in parts:
try:
s = b.decode("utf-8")
except Exception:
# best-effort fallback
try:
s = b.decode(sys.getfilesystemencoding() or "utf-8", errors="ignore")
except Exception:
continue
files.append(base / s)
return files
# ── Filters ───────────────────────────────────────────────────────────────────
def matches_any(name: str, patterns: Iterable[str]) -> bool:
low = name.lower()
return any(fnmatch.fnmatch(low, pat.lower()) or (pat.lower() in low) for pat in patterns)
def should_skip_dir(dirname: str) -> bool:
low = dirname.lower()
if low in {d.lower() for d in IGNORE_DIRS}:
return True
return matches_any(low, IGNORE_DIR_PATTERNS)
def is_binary_bytes(sample: bytes) -> bool:
if b"\x00" in sample:
return True
textchars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)))
return bool(sample.translate(None, textchars))
def decide_skip_file(p: Path, *, skip_no_ext: bool) -> Tuple[bool, str | None]:
name = p.name
if matches_any(name, SKIP_FILE_PATTERNS):
return True, "name_pattern"
ext = p.suffix.lower()
if skip_no_ext and ext == "":
return True, "no_ext"
if ext in SKIP_EXT_FULL:
return True, "ext_filtered"
return False, None
# ── Text reading & metadata ───────────────────────────────────────────────────
def read_text_safely(p: Path) -> Tuple[str, int]:
raw = p.read_bytes()
if is_binary_bytes(raw[:4096]):
raise ValueError("binary-like content")
if len(raw) > MAX_SINGLE_FILE_BYTES:
raise ValueError(f"too_large ({len(raw)} bytes)")
ext = p.suffix.lower()
if ext == ".ipynb" and IPYNB_STRIP_OUTPUTS:
try:
nb = json.loads(raw.decode("utf-8"))
for cell in nb.get("cells", []):
cell.pop("outputs", None)
cell.pop("attachments", None)
cell["execution_count"] = None
s = json.dumps(nb, ensure_ascii=False, indent=2)
return s, s.count("\n") + 1
except Exception:
pass
try:
s = raw.decode("utf-8")
except UnicodeDecodeError:
raise ValueError("non_utf8")
return s, s.count("\n") + 1
def detect_line_endings(s: str) -> str:
has_crlf = "\r\n" in s
# remove CRLF to check for lone LFs
lf_only = s.replace("\r\n", "")
has_lf = "\n" in lf_only
has_cr = "\r" in lf_only
if has_crlf and (has_lf or has_cr):
return "MIXED"
if has_crlf:
return "CRLF"
if "\n" in s:
return "LF"
return "NONE"
def sha256_text(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()
def sha256_text_normalized_lf(s: str) -> str:
normalized = s.replace("\r\n", "\n").replace("\r", "\n")
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def compute_sha256_stream(fp: Path, chunk: int = 1024 * 1024) -> str:
h = hashlib.sha256()
with open(fp, "rb") as f:
while True:
b = f.read(chunk)
if not b:
break
h.update(b)
return h.hexdigest()
def make_token_counter():
try:
import tiktoken # type: ignore
enc = None
for name in TOKENIZER_HINTS:
try:
enc = tiktoken.get_encoding(name)
break
except Exception:
continue
if enc is None:
raise RuntimeError("No suitable tiktoken encoding found")
return lambda s: len(enc.encode(s))
except Exception:
return lambda s: max(1, (len(s) + FALLBACK_CHARS_PER_TOKEN - 1) // FALLBACK_CHARS_PER_TOKEN)
def copy_to_clipboard(text: str) -> bool:
try:
import pyperclip # type: ignore
pyperclip.copy(text)
return True
except Exception:
pass
try:
system = platform.system()
if system == "Windows":
p = subprocess.Popen(["clip"], stdin=subprocess.PIPE, close_fds=True)
p.stdin.write(text.encode("utf-8"))
p.stdin.close()
return p.wait() == 0
elif system == "Darwin":
p = subprocess.Popen(["pbcopy"], stdin=subprocess.PIPE)
p.communicate(text.encode("utf-8"))
return p.returncode == 0
else:
for cmd in (["wl-copy"], ["xclip", "-selection", "clipboard"]):
try:
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
p.communicate(text.encode("utf-8"))
if p.returncode == 0:
return True
except Exception:
continue
except Exception:
pass
return False
# ── Stats printer ─────────────────────────────────────────────────────────────
def print_filetype_stats(entries: List[Dict], top_n: int = TOP_N_HEAVIEST) -> None:
if not entries:
print("[Stats] No files embedded; nothing to report.")
return
by_ext: Dict[str, Dict[str, int]] = {}
for e in entries:
ext = e.get("lang") or ""
if ext == "":
continue
agg = by_ext.setdefault(ext, {"count": 0, "tokens": 0})
agg["count"] += 1
agg["tokens"] += int(e.get("tokens", 0))
sorted_exts = sorted(by_ext.items(), key=lambda kv: (-kv[1]["tokens"], kv[0]))
print("File types (count | tokens):")
for ext, s in sorted_exts:
print(f" {ext}: {s['count']:,} | {s['tokens']:,}")
heavy = sorted(entries, key=lambda e: int(e.get("tokens", 0)), reverse=True)[:top_n]
print(f"Top {top_n} token-heavy files:")
for e in heavy:
print(f" • {e['path']} — {int(e['tokens']):,} tokens")
def _folder_key_from_path(full_path: str, *, depth: int | None, label: str) -> str:
"""
full_path is like 'repo_root/path/to/file.py' or 'src/hydraedge/...'.
We strip the leading '<label>/' and then optionally collapse to first N components.
"""
# strip bundle label
prefix = f"{label}/"
if full_path.startswith(prefix):
rel = full_path[len(prefix) :]
else:
rel = full_path.split("/", 1)[1] if "/" in full_path else ""
if rel == "":
# a file that lives directly at the label root -> treat as '.'
return "."
parts = rel.split("/")
# last element is file name; we want the directory only
if len(parts) == 1:
# file directly under label root
return "."
dirs = parts[:-1]
if depth is not None and depth > 0:
dirs = dirs[: min(depth, len(dirs))]
return "/".join(dirs) if dirs else "."
def print_top_folders(entries: list[dict], label: str, *, top_k: int, depth: int | None) -> None:
"""
Aggregate per-folder stats from entry rows (embedded files only).
We use: file count, sum(tokens), sum(size_bytes).
"""
if not entries:
return
agg = defaultdict(lambda: {"files": 0, "tokens": 0, "bytes": 0})
for e in entries:
path = e.get("path") or ""
key = _folder_key_from_path(path, depth=depth, label=label)
agg[key]["files"] += 1
agg[key]["tokens"] += int(e.get("tokens", 0))
agg[key]["bytes"] += int(e.get("size_bytes", 0) or 0)
# sort primarily by files desc, then tokens desc, then key asc
items = sorted(agg.items(), key=lambda kv: (-kv[1]["files"], -kv[1]["tokens"], kv[0]))
print(f"Top folders (by files) [depth={'None' if depth is None else depth}, top={top_k}]")
for i, (k, v) in enumerate(items[: max(0, top_k)], start=1):
mb = v["bytes"] / (1024 * 1024) if v["bytes"] else 0.0
print(f" {i:>2}) {k:<45} files={v['files']:>4} tokens={v['tokens']:>8,} bytes={mb:>6.1f} MB")
# ── Collectors ────────────────────────────────────────────────────────────────
def collect_entries_for_root(
root_path: Path,
label: str,
*,
skip_no_ext: bool,
skip_empty: bool,
slice_long: bool,
slice_tokens: int,
slice_max_lines: int,
tok_counter,
) -> Tuple[List[Dict], List[Dict]]:
"""
Returns (entries, skipped). 'entries' may include slices for long files (if enabled).
Each entry has keys: path, lang, size_bytes, chars, lines, line_endings, sha256,
sha256_normalized_lf, tokens, content, (optional) slice{}, parent
"""
entries: List[Dict] = []
skipped: List[Dict] = []
# Discover files
if root_path.is_file():
fps = [root_path]
elif root_path.is_dir():
fps = []
base = repo_root()
git_files = git_list_files_not_ignored(base, root_path)
if git_files is not None:
# Use Git’s view (tracked + untracked not ignored). Filter with decide_skip_file below.
fps = [p for p in git_files if p.is_file()]
else:
# Fallback: walk filesystem and apply static ignore dirs/patterns.
# Ignore directory read errors (e.g., permission/reparse anomalies)
for cur, dirnames, files in os.walk(root_path, topdown=True, onerror=lambda e: None):
dirnames[:] = [d for d in dirnames if not should_skip_dir(d)]
for fname in files:
fp = Path(cur) / fname
fps.append(fp)
else:
return entries, skipped # missing
# Process files
for fp in sorted(fps, key=lambda p: p.as_posix().lower()):
# Apply file-level skip rules early (name/ext-based)
skip, reason = decide_skip_file(fp, skip_no_ext=skip_no_ext)
if skip:
st = safe_lstat(fp)
try:
try_rel = fp.relative_to(root_path).as_posix()
except Exception:
try_rel = fp.name
skipped.append(
{
"path": f"{label}/{try_rel}",
"reason": reason,
"size_bytes_fs": int(st.st_size) if st else None,
"ext": fp.suffix.lower(),
}
)
continue
try:
try_rel = fp.relative_to(root_path).as_posix()
except Exception:
try_rel = fp.name
st = safe_lstat(fp)
if st is None:
skipped.append(
{
"path": f"{label}/{try_rel}",
"reason": "inaccessible",
"size_bytes_fs": None,
"ext": fp.suffix.lower(),
}
)
continue
if skip_empty and st.st_size == 0:
skipped.append(
{
"path": f"{label}/{try_rel}",
"reason": "empty",
"size_bytes_fs": 0,
"ext": fp.suffix.lower(),
}
)
continue
if st.st_size > MAX_SINGLE_FILE_BYTES:
# Too large to embed; record hash & size
skipped.append(
{
"path": f"{label}/{try_rel}",
"reason": "too_large",
"size_bytes_fs": int(st.st_size),
"sha256_full": compute_sha256_stream(fp),
"ext": fp.suffix.lower(),
}
)
continue
# Try to read as text
try:
content, nlines = read_text_safely(fp)
except Exception as e:
reason = str(e)
if "binary-like" in reason:
reason = "binary_like"
elif "non_utf8" in reason or "non-UTF8" in reason:
reason = "non_utf8"
skipped.append(
{
"path": f"{label}/{try_rel}",
"reason": reason,
"size_bytes_fs": int(st.st_size),
"ext": fp.suffix.lower(),
}
)
continue
# Metadata
ext = fp.suffix.lstrip(".").lower()
chars = len(content)
n_tokens = tok_counter(content)
le = detect_line_endings(content)
norm_sha = sha256_text_normalized_lf(content)
sha = sha256_text(content)
oversize = (n_tokens >= OVERSIZE_TOKENS_THRESHOLD) or (st.st_size >= OVERSIZE_BYTES_THRESHOLD)
def make_entry(
slice_info: Dict | None,
text: str,
lines_count: int,
start_line: int | None,
end_line: int | None,
):
path = f"{label}/{try_rel}"
if slice_info:
path = f"{path}#L{start_line}-{end_line}"
e = {
"path": path,
"parent": f"{label}/{try_rel}" if slice_info else None,
"lang": ext,
"size_bytes": len(text.encode("utf-8")),
"chars": len(text),
"lines": lines_count,
"line_endings": detect_line_endings(text),
"sha256": sha256_text(text),
"sha256_normalized_lf": sha256_text_normalized_lf(text),
"tokens": tok_counter(text),
"content": text,
}
if slice_info:
e["slice"] = slice_info
return e
# Optional slicing
if slice_long and n_tokens >= slice_tokens and nlines > slice_max_lines:
# Slice by lines for simplicity
lines = content.splitlines(keepends=True)
start = 0
while start < len(lines):
end = min(start + slice_max_lines, len(lines))
seg = "".join(lines[start:end])
slice_info = {"start_line": start + 1, "end_line": end, "parent": f"{label}/{try_rel}"}
entries.append(make_entry(slice_info, seg, end - start, start + 1, end))
start = end
else:
ent = {
"path": f"{label}/{try_rel}",
"lang": ext,
"size_bytes": len(content.encode("utf-8")),
"chars": chars,
"lines": nlines,
"line_endings": le,
"sha256": sha,
"sha256_normalized_lf": norm_sha,
"tokens": n_tokens,
"content": content,
}
# Previews for oversize (stored in index rows later)
ent["_oversize_flag"] = oversize
if oversize:
head = content[:PREVIEW_HEAD_CHARS]
tail = content[-PREVIEW_TAIL_CHARS:] if len(content) > PREVIEW_TAIL_CHARS else ""
ent["_preview_head"] = head
ent["_preview_tail"] = tail
entries.append(ent)
return entries, skipped
def collect_entries_for_repo_root(
*,
skip_no_ext: bool,
skip_empty: bool,
slice_long: bool,
slice_tokens: int,
slice_max_lines: int,
tok_counter,
):
"""Optional: top-level repository files (only if --include-repo-root is passed)."""
base = repo_root()
fps = []
for fp in sorted(base.iterdir(), key=lambda p: p.name.lower()):
# Guard against inaccessible items at repo root (e.g., reparse points)
try:
st = safe_lstat(fp)
except Exception:
st = None
if st is None:
continue
try:
is_file = fp.is_file()
except OSError:
is_file = False
if not is_file:
continue
skip, reason = decide_skip_file(fp, skip_no_ext=skip_no_ext)
if skip:
continue
if fp.suffix.lower() not in ROOT_FILE_EXT_ALLOW:
continue
fps.append(fp)
# Reuse the per-root collector with label 'repo_root'
return collect_entries_for_root(
base,
"repo_root",
skip_no_ext=skip_no_ext,
skip_empty=skip_empty,
slice_long=slice_long,
slice_tokens=slice_tokens,
slice_max_lines=slice_max_lines,
tok_counter=tok_counter,
)
# ── XREFS (naive crosswalk: sources → tests) ──────────────────────────────────
IMPORT_RE = re.compile(
r"^\s*(?:from\s+([a-zA-Z0-9_\.]+)\s+import|import\s+([a-zA-Z0-9_\.]+))", re.M
)
def build_xrefs(all_entries_by_label: Dict[str, List[Dict]]) -> Dict[str, List[str]]:
"""
Returns {source_path: [test_paths...]}.
Heuristic:
- A "test" file is any *.py under labels whose name startswith 'test' or == 'tests'.
- A "source" file is any *.py elsewhere.
- Match if test content includes module import that looks like source dotted path OR
mentions the source basename (w/o .py).
"""
tests: List[Dict] = []
sources: List[Dict] = []
for label, ents in all_entries_by_label.items():
is_test_label = label.lower().startswith("test")
for e in ents:
if (e.get("lang") or "") != "py":
continue
if is_test_label:
tests.append(e)
else:
sources.append(e)
# Pre-extract imports from tests
test_imports: Dict[str, set] = {}
for t in tests:
content = t.get("content") or ""
mods = set()
for m1, m2 in IMPORT_RE.findall(content):
if m1:
mods.add(m1)
if m2:
mods.add(m2)
test_imports[t["path"]] = mods
xrefs: Dict[str, List[str]] = {}
for s in sources:
spath = s["path"] # e.g., "src/hydraedge/extractor/core/config.py"
# candidates
rel = spath.split("/", 1)[1] if "/" in spath else spath # drop label
dotted_full = rel.replace("/", ".").removesuffix(".py")
dotted_no_src = dotted_full
if dotted_no_src.startswith("src."):
dotted_no_src = dotted_no_src[4:]
base = Path(rel).stem
hits: List[str] = []
for t in tests:
tmods = test_imports.get(t["path"], set())
if dotted_full in tmods or dotted_no_src in tmods:
hits.append(t["path"])
continue
# fallback: substring occurrences of basename (coarse)
if re.search(rf"\b{re.escape(base)}\b", t.get("content") or ""):
hits.append(t["path"])
if hits:
xrefs[spath] = sorted(set(hits))
return xrefs
# ── Sharding + per-root index ─────────────────────────────────────────────────
def write_bundle(
label: str,
entries: List[Dict],
skipped: List[Dict],
out_dir: Path,
token_window: int,
size_limit_mb: float,
build_config: Dict,
*,
top_folders: int,
folder_depth: int | None,
) -> Dict:
"""
Writes shards (binary JSONL) + per-root index (with byte offsets).
Returns bundle manifest with index_rows for GLOBAL_TOC.
"""
entries = sorted(entries, key=lambda e: e["path"])
# Summary line
total_tokens = sum(int(e.get("tokens", 0)) for e in entries)
pct = 100.0 * total_tokens / max(1, token_window)
print(f"≈ {total_tokens:,} tokens of {token_window:,} ({pct:,.2f}% )")
print_filetype_stats(entries)
# Folder histogram
if top_folders and top_folders > 0:
print_top_folders(entries, label, top_k=top_folders, depth=folder_depth)
# Shard writing (binary, track byte offsets)
shard_names: List[str] = []
shard_acc = 0
shard_idx = -1
rows_in_shard = 0
index_rows: List[Dict] = []
def shard_file(i: int) -> str:
return f"DATA__{label}-{i:03d}.jsonl"
writer: io.BufferedWriter | None = None
try:
for ent in entries:
payload = {
"path": ent["path"],
"lang": ent["lang"],
"sha256": ent["sha256"],
"lines": ent["lines"],
"content": ent["content"],
}
# Optional: keep slice meta in payload for slices
if ent.get("slice"):
payload["slice"] = ent["slice"]
payload["parent"] = ent.get("parent")
payload_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
bsz = len(payload_bytes) + 1 # newline
if writer is None or shard_acc + bsz > TARGET_SHARD_SIZE_BYTES or rows_in_shard > 10000:
if writer is not None:
writer.close()
shard_idx += 1
if shard_idx >= MAX_SHARDS:
raise RuntimeError(
f"[{label}] Shard limit exceeded ({MAX_SHARDS}). "
f"Increase MAX_SHARDS or TARGET_SHARD_SIZE_BYTES."
)
sname = shard_file(shard_idx)
shard_names.append(sname)
writer = open(out_dir / sname, "wb")
shard_acc = 0
rows_in_shard = 0
byte_offset = writer.tell()
writer.write(payload_bytes + b"\n")
byte_len = bsz
# Index row
row = {
"path": ent["path"],
"parent": ent.get("parent"),
"lang": ent["lang"],
"size_bytes": ent["size_bytes"],
"chars": ent.get("chars"),
"lines": ent["lines"],
"line_endings": ent.get("line_endings"),
"sha256": ent["sha256"],
"sha256_normalized_lf": ent.get("sha256_normalized_lf"),
"tokens": ent.get("tokens", 0),
"shard": shard_idx,
"row": rows_in_shard,
"byte_offset": int(byte_offset),
"byte_len": int(byte_len),
"oversize_tokens": bool(ent.get("_oversize_flag", False)),
"oversize_bytes": bool(ent.get("size_bytes", 0) >= OVERSIZE_BYTES_THRESHOLD),
}
# Previews (if any)
if ent.get("_preview_head") is not None:
row["preview_head"] = ent["_preview_head"]
if ent.get("_preview_tail") is not None:
row["preview_tail"] = ent["_preview_tail"]
# Slice meta (if any)
if ent.get("slice"):
row["slice"] = ent["slice"]
index_rows.append(row)
shard_acc += bsz
rows_in_shard += 1
finally:
if writer is not None:
writer.close()
# Per-root index
index_name = f"INDEX__{label}.json"
per_index = {
"schema_version": "1.1",
"bundle": {
"label": label,
"generated_utc": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"target_shard_size_bytes": TARGET_SHARD_SIZE_BYTES,
"max_shards": MAX_SHARDS,
"shards": shard_names,
},
"build_config": build_config,
"agent_howto": {
"steps": [
"Load this per-root index (JSON).",
"Locate the target by exact `path` in `files[]`.",
"Use `byte_offset` and `byte_len` to seek into `bundle.shards[entry.shard]` and read only that JSON line.",
"If offsets are missing, fall back to scanning for matching `path` AND `sha256`.",
"Parse the JSON line; use `content` as canonical source.",
"Optionally verify `sha256`.",
],
"python": (
"def load_file(index_path, target_path):\n"
" import json\n"
" from pathlib import Path\n"
" idx = json.loads(Path(index_path).read_text('utf-8'))\n"
" table = {e['path']: e for e in idx['files']}\n"
" ent = table[target_path]\n"
" shard_name = idx['bundle']['shards'][ent['shard']]\n"
" shard_path = Path(index_path).parent / shard_name\n"
" with open(shard_path, 'rb') as f:\n"
" f.seek(ent['byte_offset'])\n"
" buf = f.read(ent['byte_len'])\n"
" obj = json.loads(buf.decode('utf-8'))\n"
" assert obj.get('sha256') == ent['sha256']\n"
" return obj['content']\n"
),
},
"files": index_rows,
"skipped": skipped,
}
(out_dir / index_name).write_text(
json.dumps(per_index, ensure_ascii=False, indent=2), encoding="utf-8"
)
# Artifact sizes vs limit
limit_bytes = int(size_limit_mb * 1024 * 1024)
def human_mb(n: int) -> str:
return f"{(n / (1024 * 1024)):.2f} MB"
items = [(index_name, (out_dir / index_name).stat().st_size)]
for s in shard_names:
sp = out_dir / s
if sp.exists():
items.append((s, sp.stat().st_size))
print("Upload artifacts (size | limit):")
for name, size in items:
status = "OK" if size <= limit_bytes else "EXCEEDS"
print(f" • {name} — {human_mb(size)} of {size_limit_mb:.2f} MB ({status})")
return {
"label": label,
"index": index_name,
"shards": shard_names,
"files_count": len(index_rows),
"tokens": total_tokens,
"artifact_sizes": dict(items),
"index_rows": index_rows,
}
# ── MASTER index (with GLOBAL_TOC and optional XREFS) ─────────────────────────
def write_master_index(
out_dir: Path,
bundles: List[Dict],
overall_tokens: int,
token_window: int,
copy_summary: bool,
include_xrefs: bool,
xrefs: Dict[str, List[str]] | None,
):
master = {
"schema_version": "1.1",
"generated_utc": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"bundles": [
{
"label": b["label"],
"index": b["index"],
"shards": b["shards"],
"files_count": b["files_count"],
"tokens": b["tokens"],
}
for b in bundles
],
"agent_howto": {
"steps": [
"Load MASTER_INDEX.json.",
"Resolve path via GLOBAL_TOC to get index, shard, byte offsets.",
"Open the per-root index if you need richer metadata (previews, slices, etc.).",
"If GLOBAL_TOC is missing an entry, open the appropriate INDEX__*.json and locate it there.",
]
},
"GLOBAL_TOC": {},
}
# Build GLOBAL_TOC
for b in bundles:
idx = b["index"]
for row in b.get("index_rows", []):
master["GLOBAL_TOC"][row["path"]] = {
"label": b["label"],
"index": idx,
"shard": row["shard"],
"row": row["row"],
"sha256": row["sha256"],
"byte_offset": row["byte_offset"],
"byte_len": row["byte_len"],
}
if include_xrefs and xrefs:
master["xrefs"] = xrefs
master_name = "MASTER_INDEX.json"
(out_dir / master_name).write_text(
json.dumps(master, ensure_ascii=False, indent=2), encoding="utf-8"
)
pct = 100.0 * overall_tokens / max(1, token_window)
summary = f"≈ {overall_tokens:,} tokens of {token_window:,} ({pct:,.2f}% )"
msg = summary + " | "
msg += (
"Copied to clipboard." if (copy_summary and copy_to_clipboard(summary)) else "Copy skipped."
)
print("\nOVERALL:", msg)
print(f"[Done] {len(bundles)} bundle(s) + {master_name} in {out_dir.resolve()}")
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
p = argparse.ArgumentParser(
description="Per-root repo snapshot → JSON indices + JSONL shards with stats and O(1) seeks."
)
p.add_argument(
"--keep-no-ext",
action="store_true",
help="Include files without an extension (e.g., LICENSE, Makefile). Default: skip.",
)
p.add_argument("--keep-empty", action="store_true", help="Include empty files (default: skip).")
p.add_argument(
"--token-window",
type=int,
default=128_000,
help="Usable chat token window for summary lines (default: 128000).",
)
p.add_argument(
"--size-limit-mb",
type=float,
default=512.0,
help="Per-file upload size limit for the size checker (default: 512 MB).",
)
p.add_argument(
"--copy-summary",
action=argparse.BooleanOptionalAction,
default=True,
help="Copy the OVERALL summary line to clipboard (default: on).",
)
p.add_argument(
"--include-repo-root",
action="store_true",
help="Also create a separate bundle for top-level repository files (default: off).",
)
# Slicing options
p.add_argument(
"--slice-long-files",
action="store_true",
help="Slice long files into line chunks for retrieval (default: off).",
)
p.add_argument(
"--slice-tokens",
type=int,
default=SLICE_TOKENS_THRESHOLD_DEFAULT,
help="Token threshold to trigger slicing (default: 12000).",
)
p.add_argument(
"--slice-max-lines",
type=int,
default=SLICE_MAX_LINES_DEFAULT,
help="Max lines per slice (default: 300).",
)
# XREFS
p.add_argument(
"--no-xrefs", action="store_true", help="Disable building source→test crosswalk (xrefs)."
)
# Artifact count cap
p.add_argument(
"--max-artifacts",
type=int,
default=None,
help="Warn if total number of output files exceeds this cap.",
)
p.add_argument(
"--enforce-max-artifacts",
action="store_true",
help="Exit with non-zero status if --max-artifacts is exceeded.",
)
p.add_argument(
"--top-folders",
type=int,
default=10,
help="Print the Top-K folders by embedded file count per bundle (default: 10).",
)
p.add_argument(
"--folder-depth",
type=int,
default=None,
help="Collapse folder paths to first N components for stats (default: None = full path).",
)
args = p.parse_args()
base = repo_root()
out_dir = to_abs(OUTPUT_DIR)
out_dir.mkdir(parents=True, exist_ok=True)
skip_no_ext = not args.keep_no_ext
skip_empty = not args.keep_empty
# Tokenizer once
tok = make_token_counter()
# Phase 1: collect entries + skips for each root (in memory)
collected: Dict[str, Dict] = {}
for root in SRC_ROOTS:
rp = to_abs(root)
if not rp.exists():
continue
label = sanitize_label(rp, base)
print(f"\n== Scanning: {label} ==")
entries, skipped = collect_entries_for_root(
rp,
label,
skip_no_ext=skip_no_ext,
skip_empty=skip_empty,
slice_long=args.slice_long_files,
slice_tokens=args.slice_tokens,
slice_max_lines=args.slice_max_lines,
tok_counter=tok,
)
if not entries and not skipped:
print("[Info] No embeddable text files found (after filters).")
continue
collected[label] = {"entries": entries, "skipped": skipped}
# Optional: repo_root bundle
if args.include_repo_root:
entries, skipped = collect_entries_for_repo_root(
skip_no_ext=skip_no_ext,
skip_empty=skip_empty,
slice_long=args.slice_long_files,
slice_tokens=args.slice_tokens,
slice_max_lines=args.slice_max_lines,
tok_counter=tok,
)
if entries or skipped:
collected["repo_root"] = {"entries": entries, "skipped": skipped}
# Phase 2: xrefs (optional)
xrefs = None
if not args.no_xrefs:
all_entries_by_label = {lbl: v["entries"] for lbl, v in collected.items()}
xrefs = build_xrefs(all_entries_by_label)
# Phase 3: write bundles
bundles: List[Dict] = []
overall_tokens = 0
for label, data in collected.items():
print(f"\n== Bundle: {label} ==")
build_config = {
"ignore_rules": {
"dirs": sorted(IGNORE_DIRS),
"dir_patterns": IGNORE_DIR_PATTERNS,
"skip_file_patterns": SKIP_FILE_PATTERNS,
"skip_ext_full": sorted(SKIP_EXT_FULL),
},
"limits": {
"max_single_file_bytes": MAX_SINGLE_FILE_BYTES,
"target_shard_size_bytes": TARGET_SHARD_SIZE_BYTES,
"oversize_tokens_threshold": OVERSIZE_TOKENS_THRESHOLD,
"oversize_bytes_threshold": OVERSIZE_BYTES_THRESHOLD,
},
"flags": {
"skip_no_ext": skip_no_ext,
"skip_empty": skip_empty,
"slice_long_files": args.slice_long_files,
"slice_tokens": args.slice_tokens,
"slice_max_lines": args.slice_max_lines,
},
}
b = write_bundle(
label,
data["entries"],
data["skipped"],
out_dir,
args.token_window,
args.size_limit_mb,
build_config,
top_folders=args.top_folders,
folder_depth=args.folder_depth,
)
bundles.append(b)
overall_tokens += b["tokens"]
# Phase 4: MASTER index with GLOBAL_TOC (+xrefs) and overall summary
write_master_index(
out_dir,
bundles,
overall_tokens,
args.token_window,
args.copy_summary,
include_xrefs=(not args.no_xrefs),
xrefs=xrefs,
)
# Phase 5: artifacts cap check
if args.max_artifacts is not None:
total_artifacts = 1 # MASTER_INDEX.json
for b in bundles:
total_artifacts += 1 # per-root INDEX__*.json
total_artifacts += len(b["shards"])
if total_artifacts > args.max_artifacts:
msg = (
f"WARNING: produced {total_artifacts} artifacts "
f"(cap {args.max_artifacts}). Consider increasing shard size, "
f"disabling repo_root, or consolidating roots."
)
print(msg)
if args.enforce_max_artifacts:
sys.exit(2)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment