Skip to content

Instantly share code, notes, and snippets.

@pakkinlau
Last active November 24, 2025 18:47
Show Gist options
  • Select an option

  • Save pakkinlau/2bfa8eeac0795f8d0d767833ccf4e9d4 to your computer and use it in GitHub Desktop.

Select an option

Save pakkinlau/2bfa8eeac0795f8d0d767833ccf4e9d4 to your computer and use it in GitHub Desktop.
Read the repository snapshot created by "package_to_snapshot_json.py" (https://gist.github.com/pakkinlau/342eede3f7f969b0f2d6b33ce6aa503a)
#!/usr/bin/env python3
"""
repo_snapshot_query.py
Utilities for querying a repo snapshot bundle produced by the
`agentspine` snapshot tool (or any tool that emits the same layout).
Snapshot layout (on disk)
-------------------------
At the top level there is a directory containing at least:
- ``MASTER_INDEX.json``:
Global table of contents across all bundles.
- One or more per-root index files, referenced from ``MASTER_INDEX["bundles"]``.
For example::
{
"label": "repo_root",
"index": "INDEX__repo_root.json",
"shards": ["DATA__repo_root-000.jsonl"],
...
}
- One or more shard files (newline-delimited JSON), e.g.::
DATA__repo_root-000.jsonl
Each per-root index file (``INDEX__*.json``) contains:
- ``bundle``: metadata including the shard filenames.
- ``files``: a list of file entries with byte offsets into the shard.
- ``agent_howto``: short instructions, which are codified in this module.
Each shard line looks like:
{
"path": "repo_root/path/to/file.py",
"lang": "py",
"sha256": "...",
"lines": 123,
"content": "Entire file contents as a single string"
}
Goal
----
This module wraps the on-disk structure into a small, LLM-friendly
API so that an agent does not need to re-derive the layout on every
invocation.
LLM / agent usage
-----------------
From within a tool-using environment:
- Construct :class:`RepoSnapshot` pointing at the directory that
contains ``MASTER_INDEX.json``.
- Use :meth:`RepoSnapshot.read_text` to load a file's contents by its
repository-style path (e.g. ``"repo_root/agentspine/__init__.py"``).
- Use :meth:`RepoSnapshot.search` or :meth:`RepoSnapshot.grep` for
simple full-text search across the snapshot.
The implementation is intentionally conservative: it only uses JSON and
filesystem primitives, and avoids any external dependencies.
"""
from __future__ import annotations
import argparse
from dataclasses import dataclass
import json
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class FileEntry:
"""
Metadata for a single file in a snapshot.
Attributes
----------
label:
Bundle label from ``MASTER_INDEX["bundles"][i]["label"]``,
e.g. ``"repo_root"``.
path:
Full path string as stored in the index, e.g.
``"repo_root/agentspine/__init__.py"``.
lang:
Short language tag (``"py"``, ``"md"``, etc.) when present.
meta:
Full JSON metadata entry from the per-root index's ``files`` list.
"""
label: str
path: str
lang: Optional[str]
meta: Dict[str, Any]
def short_path(self) -> str:
"""
Return a convenience form of the path with the label prefix removed,
when applicable.
"""
if self.path.startswith(self.label + "/"):
return self.path[len(self.label) + 1 :]
return self.path
def dirname(self) -> str:
"""
Directory component of :meth:`short_path`.
Returns an empty string for files at the snapshot root.
"""
sp = self.short_path()
parts = sp.rsplit("/", 1)
return parts[0] if len(parts) == 2 else ""
def basename(self) -> str:
"""
Final path component (filename) of :meth:`short_path`.
"""
return self.short_path().rsplit("/", 1)[-1]
@property
def lines(self) -> Optional[int]:
"""
Line count for this file if present in ``meta["lines"]``.
"""
value = self.meta.get("lines")
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
class RootIndex:
"""
Helper around a single per-root index (``INDEX__*.json``).
Normally you access this via :class:`RepoSnapshot` rather than
constructing it directly.
"""
def __init__(self, root_dir: Path, index_path: Path, label: str):
self.root_dir = root_dir
self.index_path = index_path
self.label = label
with index_path.open("r", encoding="utf-8") as f:
idx = json.load(f)
self._bundle_meta: Dict[str, Any] = idx["bundle"]
self._files_raw: List[Dict[str, Any]] = list(idx["files"])
self._file_by_path: Dict[str, Dict[str, Any]] = {
entry["path"]: entry for entry in self._files_raw
}
# Optional agent-specific instructions.
self._agent_howto: Any = idx.get("agent_howto")
# Shard filenames (relative to root_dir)
self._shards: List[Path] = [
root_dir / shard_name for shard_name in self._bundle_meta["shards"]
]
# ------------------------------------------------------------------
@property
def files(self) -> List[FileEntry]:
"""Return :class:`FileEntry` objects for all files in this root."""
return [
FileEntry(
label=self.label,
path=entry["path"],
lang=entry.get("lang"),
meta=entry,
)
for entry in self._files_raw
]
@property
def agent_howto(self) -> Any:
"""
Optional bundle-specific instructions for agents, taken from the
per-root index's ``agent_howto`` field (if present).
"""
return self._agent_howto
# ------------------------------------------------------------------
def get_entry(self, path: str) -> FileEntry:
"""
Look up a single file entry by its *exact* stored path.
"""
try:
entry = self._file_by_path[path]
except KeyError as exc:
raise KeyError(f"Path not found in index: {path!r}") from exc
return FileEntry(
label=self.label,
path=path,
lang=entry.get("lang"),
meta=entry,
)
# ------------------------------------------------------------------
def _load_raw_object(self, entry: FileEntry) -> Dict[str, Any]:
"""
Load the raw JSON object for *entry* from the shard file.
Fast path: uses byte offsets and shard index from the per-root index.
Fallback: scans the shard(s) for a matching ``path`` and ``sha256``.
"""
e = entry.meta
shard_index = e.get("shard")
byte_offset = e.get("byte_offset")
byte_len = e.get("byte_len")
if shard_index is not None and byte_offset is not None and byte_len is not None:
shard_path = self._shards[shard_index]
with shard_path.open("rb") as f:
f.seek(int(byte_offset))
buf = f.read(int(byte_len))
obj = json.loads(buf.decode("utf-8"))
# Optional integrity check
sha = e.get("sha256")
if sha and obj.get("sha256") and obj["sha256"] != sha:
raise ValueError(
f"SHA mismatch for {entry.path}: index={sha} shard={obj['sha256']}"
)
return obj
# Fallback: scan all shards (slower, but robust if offsets are missing)
target_sha = e.get("sha256")
for shard_path in self._shards:
with shard_path.open("r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
obj = json.loads(line)
if obj.get("path") == entry.path:
if target_sha and obj.get("sha256") != target_sha:
continue
return obj
raise FileNotFoundError(
f"Could not locate object for {entry.path!r} in any shard."
)
# ------------------------------------------------------------------
def read_text(self, path: str) -> str:
"""
Load the ``content`` string for *path*.
``path`` must match the stored index path exactly, e.g.
``"repo_root/agentspine/__init__.py"``.
"""
entry = self.get_entry(path)
obj = self._load_raw_object(entry)
content = obj.get("content")
if not isinstance(content, str):
raise TypeError(f"Unexpected content type for {path!r}: {type(content)!r}")
return content
# ---------------------------------------------------------------------------
# High-level snapshot wrapper
# ---------------------------------------------------------------------------
class RepoSnapshot:
"""
Wrapper around an entire snapshot directory (one ``MASTER_INDEX.json``
plus any number of per-root indices).
Typical usage
-------------
>>> snap = RepoSnapshot.from_dir("path/to/snapshot_dir")
>>> for f in snap.iter_files(lang="py"):
... print(f.short_path())
>>> text = snap.read_text("repo_root/agentspine/__init__.py")
LLM-focused tips
----------------
- If you only know a relative path like ``"agentspine/__init__.py"``,
call :meth:`resolve_path` to map it to the fully-qualified form.
- For small ad-hoc searches, use :meth:`search` or :meth:`grep`.
For complex code navigation, prefer building your own index in
the host environment.
"""
def __init__(self, root_dir: Path, master_index: Dict[str, Any]):
self.root_dir = root_dir
self.master_index = master_index
bundles = master_index.get("bundles") or []
self._roots: Dict[str, RootIndex] = {}
for bundle in bundles:
label = bundle["label"]
index_rel = bundle["index"]
index_path = root_dir / index_rel
self._roots[label] = RootIndex(root_dir=root_dir, index_path=index_path, label=label)
# ------------------------------------------------------------------
@classmethod
def from_dir(cls, root_dir: str | Path) -> "RepoSnapshot":
"""
Load a snapshot rooted at *root_dir*.
``root_dir`` must contain ``MASTER_INDEX.json``.
"""
root = Path(root_dir)
master_path = root / "MASTER_INDEX.json"
with master_path.open("r", encoding="utf-8") as f:
master = json.load(f)
return cls(root_dir=root, master_index=master)
# Convenience for the common case where the snapshot sits in cwd.
@classmethod
def from_default_dir(cls) -> "RepoSnapshot":
"""
Attempt to load ``MASTER_INDEX.json`` from the current directory.
"""
return cls.from_dir(".")
# ------------------------------------------------------------------
def root_labels(self) -> List[str]:
"""Return the list of bundle labels present in this snapshot."""
return list(self._roots.keys())
def agent_howto_by_root(self) -> Dict[str, Any]:
"""
Return any ``agent_howto`` payloads from per-root indices.
The result is a mapping from bundle label to the corresponding
``agent_howto`` value, omitting roots where it is absent.
"""
out: Dict[str, Any] = {}
for label, root in self._roots.items():
if root.agent_howto is not None:
out[label] = root.agent_howto
return out
# ------------------------------------------------------------------
def iter_files(
self,
*,
label: Optional[str] = None,
lang: Optional[str] = None,
path_prefix: Optional[str] = None,
) -> Iterator[FileEntry]:
"""
Iterate over all file entries, optionally constrained by bundle
label, language, and/or a path prefix.
If *label* is provided and *path_prefix* does not start with
``"<label>/"``, it is interpreted as relative to that label.
"""
roots: Iterable[Tuple[str, RootIndex]]
if label is not None:
roots = [(label, self._roots[label])]
else:
roots = list(self._roots.items())
for lbl, root in roots:
if path_prefix:
if label is not None:
# Interpret prefix as relative to the bundle label unless the
# caller already supplied a fully-qualified prefix.
if path_prefix.startswith(lbl + "/"):
prefix_full = path_prefix
else:
prefix_full = lbl + "/" + path_prefix.lstrip("/")
else:
# Caller is responsible for supplying a fully-qualified prefix.
prefix_full = path_prefix
else:
prefix_full = None
for entry in root.files:
if lang and entry.lang != lang:
continue
if prefix_full and not entry.path.startswith(prefix_full):
continue
yield entry
# ------------------------------------------------------------------
def resolve_path(self, spec: str, *, label: Optional[str] = None) -> str:
"""
Resolve a human-friendly *spec* to an exact stored path.
Resolution strategy:
1. If *spec* matches a stored path exactly, return it.
2. If *spec* omits the label prefix, try to find a unique file
whose path, after removing ``"<label>/"``, matches *spec*.
3. As a last resort, look for files whose path ends with *spec*.
Raises :class:`KeyError` if nothing matches, or :class:`ValueError`
if the resolution would be ambiguous.
"""
candidates: List[str] = []
def consider(label_filter: Optional[str] = None) -> None:
roots: Iterable[Tuple[str, RootIndex]]
if label_filter is not None:
if label_filter not in self._roots:
return
roots = [(label_filter, self._roots[label_filter])]
else:
roots = self._roots.items()
for lbl, root in roots:
for entry in root.files:
full = entry.path
short = entry.short_path()
if full == spec or short == spec:
candidates.append(full)
# Exact and short-path matches
consider(label_filter=label)
if not candidates:
# Fall back to suffix match
suffix = spec
roots = (
[(label, self._roots[label])] if label is not None else self._roots.items()
)
for lbl, root in roots:
for entry in root.files:
if entry.path.endswith("/" + suffix) or entry.path == suffix:
candidates.append(entry.path)
if not candidates:
raise KeyError(f"No file matches spec {spec!r}")
unique = sorted(set(candidates))
if len(unique) > 1:
joined = ", ".join(unique[:5])
raise ValueError(f"Ambiguous spec {spec!r}; matches: {joined} ...")
return unique[0]
# ------------------------------------------------------------------
def read_text(self, path_or_spec: str, *, label: Optional[str] = None) -> str:
"""
Load file contents given either an exact path or a human-friendly
spec (see :meth:`resolve_path`).
"""
path = self.resolve_path(path_or_spec, label=label)
# Find the appropriate root.
for lbl, root in self._roots.items():
if path.startswith(lbl + "/"):
return root.read_text(path)
# Fallback: brute-force search (should be rare).
for root in self._roots.values():
try:
return root.read_text(path)
except KeyError:
continue
raise KeyError(f"Unable to locate path {path!r} in any root.")
# ------------------------------------------------------------------
def search(
self,
needle: str,
*,
label: Optional[str] = None,
lang: Optional[str] = None,
case_sensitive: bool = False,
path_prefix: Optional[str] = None,
max_files: int = 20,
) -> List[Tuple[FileEntry, List[str]]]:
"""
Simple full-text search across snapshot files.
Parameters
----------
needle:
Substring to look for.
label:
Optional bundle label to restrict the search.
lang:
Optional language filter.
case_sensitive:
If ``False`` (default), perform case-insensitive matching.
path_prefix:
Optional path prefix filter applied to file paths.
max_files:
Stop after this many files have at least one match.
Returns
-------
matches:
List of ``(FileEntry, snippets)`` tuples. Each *snippets* is a
list of short excerpts from the file containing the match.
"""
if not needle:
raise ValueError("needle must be a non-empty string")
matches: List[Tuple[FileEntry, List[str]]] = []
needle_norm = needle if case_sensitive else needle.lower()
for entry in self.iter_files(label=label, lang=lang, path_prefix=path_prefix):
try:
# Use the underlying root directly to avoid extra resolution work.
root = self._roots[entry.label]
text = root.read_text(entry.path)
except Exception:
continue
hay = text if case_sensitive else text.lower()
if needle_norm not in hay:
continue
snippets: List[str] = []
# Collect up to a few snippets around the first few occurrences.
start = 0
while True:
idx = hay.find(needle_norm, start)
if idx == -1 or len(snippets) >= 5:
break
lo = max(0, idx - 60)
hi = min(len(text), idx + len(needle) + 60)
snippet = text[lo:hi].replace("\n", " ")
snippets.append(snippet)
start = idx + len(needle_norm)
matches.append((entry, snippets))
if len(matches) >= max_files:
break
return matches
# ------------------------------------------------------------------
def grep(
self,
needle: str,
*,
label: Optional[str] = None,
lang: Optional[str] = None,
path_prefix: Optional[str] = None,
case_sensitive: bool = False,
max_hits: int = 200,
) -> List[Tuple[FileEntry, List[Tuple[int, str]]]]:
"""
Line-oriented full-text search across snapshot files.
Parameters
----------
needle:
Substring to look for (must be non-empty).
label:
Optional bundle label to restrict the search.
lang:
Optional language filter.
path_prefix:
Optional path prefix filter applied to file paths.
case_sensitive:
If ``False`` (default), perform case-insensitive matching.
max_hits:
Stop after this many individual line matches across all files.
Returns
-------
matches:
List of ``(FileEntry, hits)`` tuples, where *hits* is a list
of ``(line_number, line_text)`` pairs.
"""
if not needle:
raise ValueError("needle must be a non-empty string")
matches: List[Tuple[FileEntry, List[Tuple[int, str]]]] = []
needle_norm = needle if case_sensitive else needle.lower()
total_hits = 0
for entry in self.iter_files(label=label, lang=lang, path_prefix=path_prefix):
try:
root = self._roots[entry.label]
text = root.read_text(entry.path)
except Exception:
continue
lines = text.splitlines()
hits: List[Tuple[int, str]] = []
for i, line in enumerate(lines, start=1):
hay = line if case_sensitive else line.lower()
if needle_norm in hay:
hits.append((i, line))
total_hits += 1
if total_hits >= max_hits:
break
if hits:
matches.append((entry, hits))
if total_hits >= max_hits:
break
return matches
# ---------------------------------------------------------------------------
# Command-line interface
# ---------------------------------------------------------------------------
def _cmd_list(args: argparse.Namespace) -> int:
snap = RepoSnapshot.from_dir(args.snapshot_dir)
for entry in snap.iter_files(label=args.label, lang=args.lang, path_prefix=args.path_prefix):
print(entry.path)
return 0
def _cmd_show(args: argparse.Namespace) -> int:
snap = RepoSnapshot.from_dir(args.snapshot_dir)
text = snap.read_text(args.path, label=args.label)
print(text)
return 0
def _cmd_search(args: argparse.Namespace) -> int:
snap = RepoSnapshot.from_dir(args.snapshot_dir)
results = snap.search(
args.pattern,
label=args.label,
lang=args.lang,
case_sensitive=args.case_sensitive,
path_prefix=args.path_prefix,
max_files=args.max_files,
)
for entry, snippets in results:
print(f"# {entry.path}")
for snip in snippets:
print(" ...", snip)
print()
return 0
def _cmd_grep(args: argparse.Namespace) -> int:
snap = RepoSnapshot.from_dir(args.snapshot_dir)
results = snap.grep(
args.pattern,
label=args.label,
lang=args.lang,
path_prefix=args.path_prefix,
case_sensitive=args.case_sensitive,
max_hits=args.max_hits,
)
for entry, hits in results:
for line_no, line in hits:
print(f"{entry.path}:{line_no}: {line}")
return 0
def _cmd_manifest(args: argparse.Namespace) -> int:
"""
Print a tab-separated manifest of files: label, path, lang, lines.
"""
snap = RepoSnapshot.from_dir(args.snapshot_dir)
for entry in snap.iter_files(
label=args.label,
lang=args.lang,
path_prefix=args.path_prefix,
):
lang = entry.lang or ""
lines = entry.lines if entry.lines is not None else ""
print(f"{entry.label}\t{entry.path}\t{lang}\t{lines}")
return 0
def _cmd_describe(args: argparse.Namespace) -> int:
"""
Print a high-level summary of the snapshot contents.
"""
snap = RepoSnapshot.from_dir(args.snapshot_dir)
print(f"Snapshot dir : {snap.root_dir}")
labels = sorted(snap.root_labels())
print(f"Bundles : {', '.join(labels) if labels else '(none)'}")
total_files = 0
lang_counts: Dict[str, int] = {}
for entry in snap.iter_files():
total_files += 1
if entry.lang:
lang_counts[entry.lang] = lang_counts.get(entry.lang, 0) + 1
print(f"Total files : {total_files}")
if lang_counts:
print("Languages :")
for lang, count in sorted(lang_counts.items(), key=lambda kv: (-kv[1], kv[0])):
print(f" {lang:8s} {count:5d}")
howtos = snap.agent_howto_by_root()
if howtos:
print("agent_howto :")
for label in sorted(howtos):
value = howtos[label]
preview = str(value).replace("\n", " ")
if len(preview) > 60:
preview = preview[:57] + "..."
kind = type(value).__name__
print(f" {label}: ({kind}) {preview}")
return 0
def _build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Query a repo snapshot built with MASTER_INDEX + INDEX__*.json.",
)
p.add_argument(
"--snapshot-dir",
default=".",
help="Directory containing MASTER_INDEX.json (default: current directory).",
)
sub = p.add_subparsers(dest="cmd", required=True)
# list
p_list = sub.add_parser("list", help="List files in the snapshot.")
p_list.add_argument("--label", help="Restrict to a particular bundle label.")
p_list.add_argument("--lang", help="Restrict to a language tag (e.g. 'py').")
p_list.add_argument(
"--path-prefix",
help="Restrict to files whose paths start with this prefix.",
)
p_list.set_defaults(func=_cmd_list)
# show
p_show = sub.add_parser("show", help="Print the contents of a file.")
p_show.add_argument("path", help="File path or spec to read.")
p_show.add_argument("--label", help="Optional bundle label hint.")
p_show.set_defaults(func=_cmd_show)
# search
p_search = sub.add_parser("search", help="Full-text search across files (snippet-style).")
p_search.add_argument("pattern", help="Substring to search for.")
p_search.add_argument("--label", help="Restrict to a particular bundle label.")
p_search.add_argument("--lang", help="Restrict to a language tag (e.g. 'py').")
p_search.add_argument(
"--path-prefix",
help="Restrict to files whose paths start with this prefix.",
)
p_search.add_argument(
"--case-sensitive",
action="store_true",
help="Perform a case-sensitive search.",
)
p_search.add_argument(
"--max-files",
type=int,
default=20,
help="Stop after this many files have matches (default: 20).",
)
p_search.set_defaults(func=_cmd_search)
# grep
p_grep = sub.add_parser(
"grep",
help="Line-oriented full-text search (file:line: text).",
)
p_grep.add_argument("pattern", help="Substring to search for.")
p_grep.add_argument("--label", help="Restrict to a particular bundle label.")
p_grep.add_argument("--lang", help="Restrict to a language tag (e.g. 'py').")
p_grep.add_argument(
"--path-prefix",
help="Restrict to files whose paths start with this prefix.",
)
p_grep.add_argument(
"--case-sensitive",
action="store_true",
help="Perform a case-sensitive search.",
)
p_grep.add_argument(
"--max-hits",
type=int,
default=200,
help="Stop after this many individual line matches (default: 200).",
)
p_grep.set_defaults(func=_cmd_grep)
# manifest
p_manifest = sub.add_parser(
"manifest",
help="Print a tab-separated manifest of files (label, path, lang, lines).",
)
p_manifest.add_argument("--label", help="Restrict to a particular bundle label.")
p_manifest.add_argument("--lang", help="Restrict to a language tag (e.g. 'py').")
p_manifest.add_argument(
"--path-prefix",
help="Restrict to files whose paths start with this prefix.",
)
p_manifest.set_defaults(func=_cmd_manifest)
# describe
p_describe = sub.add_parser(
"describe",
help="Print a high-level summary of the snapshot.",
)
p_describe.set_defaults(func=_cmd_describe)
return p
def main(argv: Optional[Sequence[str]] = None) -> int:
parser = _build_arg_parser()
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment