Created
April 17, 2026 16:00
-
-
Save SolarianZ/db5ae2981f15c97d5c9fe12a11b7d212 to your computer and use it in GitHub Desktop.
Rebuild Codex local conversation indexes from rollout files and SQLite state.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Rebuild Codex local conversation indexes from rollout files and SQLite state. | |
| This utility regenerates `session_index.jsonl` and/or `history.jsonl` under | |
| `CODEX_HOME` (defaults to `~/.codex`) from local rollout/session files. | |
| Default behavior is intentionally conservative: | |
| - rebuild the conversation list from non-archived threads in `state_5.sqlite` | |
| - restore only the most recent 100 conversations (`--max-threads 0` means no limit) | |
| - rebuild `history.jsonl` only for the restored conversation set | |
| - exclude orphan session files unless `--include-orphans` is provided | |
| - honor `[history].max_bytes` from `config.toml` unless `--ignore-history-max-bytes` is used | |
| Usage examples: | |
| python refresh_codex_history.py --dry-run | |
| python refresh_codex_history.py --max-threads 200 | |
| python refresh_codex_history.py --include-orphans --ignore-history-max-bytes | |
| Main parameters: | |
| --max-threads N | |
| Maximum number of most-recent conversations to restore. Default: 100. | |
| Use 0 to disable the limit. | |
| --include-orphans | |
| Include session files that exist under `sessions/` but do not have a | |
| matching thread row in `state_5.sqlite`. | |
| --only {session-index,history,both} | |
| Choose which output file(s) to rebuild. | |
| --dry-run | |
| Show what would be written without modifying any files. | |
| --ignore-history-max-bytes | |
| Ignore `[history].max_bytes` from `config.toml` while rebuilding | |
| `history.jsonl`. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import shutil | |
| import sqlite3 | |
| import tempfile | |
| import traceback | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from textwrap import dedent | |
| from typing import Iterable | |
| try: | |
| import tomllib | |
| except ModuleNotFoundError: # pragma: no cover - fallback for older Python | |
| import tomli as tomllib # type: ignore[no-redef] | |
| HISTORY_SOFT_CAP_RATIO = 0.8 | |
| def parse_nonnegative_int(value: str) -> int: | |
| parsed = int(value) | |
| if parsed < 0: | |
| raise argparse.ArgumentTypeError("value must be >= 0") | |
| return parsed | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Rebuild Codex session_index.jsonl and history.jsonl from local state.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=dedent( | |
| """\ | |
| Examples: | |
| python refresh_codex_history.py --dry-run | |
| python refresh_codex_history.py --max-threads 200 | |
| python refresh_codex_history.py --only session-index --include-orphans | |
| """ | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--codex-home", | |
| type=Path, | |
| default=None, | |
| help="Codex home directory. Defaults to CODEX_HOME or ~/.codex.", | |
| ) | |
| parser.add_argument( | |
| "--only", | |
| choices=("session-index", "history", "both"), | |
| default="both", | |
| help="Choose which file(s) to rebuild.", | |
| ) | |
| parser.add_argument( | |
| "--max-threads", | |
| type=parse_nonnegative_int, | |
| default=100, | |
| help=( | |
| "Maximum number of most recently updated conversations to restore. " | |
| "Use 0 for no limit. Default: 100." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--include-orphans", | |
| action="store_true", | |
| help=( | |
| "Include session files found under sessions/ that do not have a " | |
| "matching thread row in state_5.sqlite." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--ignore-history-max-bytes", | |
| action="store_true", | |
| help=( | |
| "Ignore [history].max_bytes from config.toml when rebuilding " | |
| "history.jsonl." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be written without modifying files.", | |
| ) | |
| parser.add_argument( | |
| "--no-backup", | |
| action="store_true", | |
| help="Do not create timestamped backups before overwriting files.", | |
| ) | |
| return parser.parse_args() | |
| def resolve_codex_home(explicit: Path | None) -> Path: | |
| if explicit is not None: | |
| return explicit.expanduser().resolve() | |
| env_home = Path.home().joinpath(".codex") | |
| return Path(os.environ.get("CODEX_HOME", str(env_home))).expanduser().resolve() | |
| def parse_iso8601(value: str) -> datetime: | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc) | |
| def parse_session_timestamp(value: str | None) -> tuple[datetime | None, str | None]: | |
| if not value: | |
| return None, None | |
| dt = parse_iso8601(value) | |
| return dt, value | |
| def format_iso_from_dt(dt: datetime) -> str: | |
| return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") | |
| def format_iso_from_ms(epoch_ms: int | None) -> str | None: | |
| if epoch_ms is None: | |
| return None | |
| dt = datetime.fromtimestamp(epoch_ms / 1000, tz=timezone.utc) | |
| return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z") | |
| def to_epoch_seconds(dt: datetime) -> int: | |
| return int(dt.timestamp()) | |
| def sanitize_user_text(text: str | None) -> str | None: | |
| if text is None: | |
| return None | |
| return text.rstrip("\r\n") | |
| def first_line(text: str | None) -> str: | |
| if not text: | |
| return "" | |
| for line in text.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| return stripped | |
| return text.strip() | |
| @dataclass | |
| class SessionScan: | |
| session_id: str | |
| rollout_path: Path | |
| created_at: datetime | None = None | |
| updated_at: datetime | None = None | |
| updated_at_raw: str | None = None | |
| first_user_message: str | None = None | |
| source: str | None = None | |
| @dataclass | |
| class ThreadRow: | |
| session_id: str | |
| rollout_path: Path | None | |
| title: str | |
| first_user_message: str | |
| created_at_ms: int | None | |
| updated_at_ms: int | None | |
| archived: bool | |
| @dataclass | |
| class SessionIndexBuild: | |
| entries: list[dict[str, str]] | |
| restored_session_ids: set[str] | |
| skipped_orphans: int | |
| used_thread_rows: bool | |
| def iter_session_files(sessions_root: Path) -> Iterable[Path]: | |
| if not sessions_root.exists(): | |
| return [] | |
| return sorted(sessions_root.rglob("*.jsonl")) | |
| def scan_session_file(path: Path) -> tuple[SessionScan | None, list[dict[str, object]]]: | |
| session_id: str | None = None | |
| created_at: datetime | None = None | |
| updated_at: datetime | None = None | |
| updated_at_raw: str | None = None | |
| first_user_message: str | None = None | |
| source: str | None = None | |
| history_entries: list[dict[str, object]] = [] | |
| saw_event_user_message = False | |
| try: | |
| with path.open("r", encoding="utf-8") as handle: | |
| for line in handle: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| obj = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| ts_text = obj.get("timestamp") | |
| if isinstance(ts_text, str): | |
| dt, raw = parse_session_timestamp(ts_text) | |
| if dt is not None: | |
| updated_at = dt | |
| updated_at_raw = raw | |
| entry_type = obj.get("type") | |
| payload = obj.get("payload", {}) | |
| if entry_type == "session_meta" and isinstance(payload, dict): | |
| maybe_id = payload.get("id") | |
| if isinstance(maybe_id, str) and maybe_id: | |
| session_id = maybe_id | |
| created_text = payload.get("timestamp") | |
| if isinstance(created_text, str): | |
| dt, _ = parse_session_timestamp(created_text) | |
| if dt is not None: | |
| created_at = dt | |
| maybe_source = payload.get("source") | |
| if isinstance(maybe_source, str) and maybe_source: | |
| source = maybe_source | |
| if entry_type == "event_msg" and isinstance(payload, dict): | |
| event_type = payload.get("type") | |
| if event_type == "user_message": | |
| message = sanitize_user_text(payload.get("message")) | |
| if message: | |
| saw_event_user_message = True | |
| if first_user_message is None: | |
| first_user_message = message | |
| if session_id and updated_at is not None: | |
| history_entries.append( | |
| { | |
| "session_id": session_id, | |
| "ts": to_epoch_seconds(updated_at), | |
| "text": message, | |
| } | |
| ) | |
| if ( | |
| not saw_event_user_message | |
| and entry_type == "response_item" | |
| and isinstance(payload, dict) | |
| and payload.get("type") == "message" | |
| and payload.get("role") == "user" | |
| ): | |
| parts: list[str] = [] | |
| for item in payload.get("content", []): | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") in {"input_text", "output_text"}: | |
| text = item.get("text") | |
| if isinstance(text, str) and text: | |
| parts.append(text) | |
| fallback_text = sanitize_user_text("\n".join(parts)) | |
| if fallback_text and first_user_message is None: | |
| first_user_message = fallback_text | |
| except OSError: | |
| return None, [] | |
| if session_id is None: | |
| return None, history_entries | |
| return ( | |
| SessionScan( | |
| session_id=session_id, | |
| rollout_path=path, | |
| created_at=created_at, | |
| updated_at=updated_at, | |
| updated_at_raw=updated_at_raw, | |
| first_user_message=first_user_message, | |
| source=source, | |
| ), | |
| history_entries, | |
| ) | |
| def load_thread_rows(state_db: Path) -> dict[str, ThreadRow]: | |
| if not state_db.exists(): | |
| return {} | |
| rows: dict[str, ThreadRow] = {} | |
| conn = sqlite3.connect(str(state_db)) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| cur = conn.cursor() | |
| for row in cur.execute( | |
| """ | |
| SELECT | |
| id, | |
| rollout_path, | |
| title, | |
| first_user_message, | |
| created_at_ms, | |
| updated_at_ms, | |
| archived | |
| FROM threads | |
| """ | |
| ): | |
| rollout_path = row["rollout_path"] | |
| rows[row["id"]] = ThreadRow( | |
| session_id=row["id"], | |
| rollout_path=Path(rollout_path) if rollout_path else None, | |
| title=row["title"] or "", | |
| first_user_message=row["first_user_message"] or "", | |
| created_at_ms=row["created_at_ms"], | |
| updated_at_ms=row["updated_at_ms"], | |
| archived=bool(row["archived"]), | |
| ) | |
| finally: | |
| conn.close() | |
| return rows | |
| def load_history_max_bytes(codex_home: Path) -> int | None: | |
| """Read `[history].max_bytes` from `config.toml` when available.""" | |
| config_path = codex_home / "config.toml" | |
| if not config_path.exists(): | |
| return None | |
| try: | |
| config = tomllib.loads(config_path.read_text(encoding="utf-8")) | |
| except (OSError, tomllib.TOMLDecodeError): | |
| return None | |
| history = config.get("history") | |
| if not isinstance(history, dict): | |
| return None | |
| max_bytes = history.get("max_bytes") | |
| if isinstance(max_bytes, int) and max_bytes > 0: | |
| return max_bytes | |
| return None | |
| def build_session_index( | |
| thread_rows: dict[str, ThreadRow], | |
| scans: dict[str, SessionScan], | |
| *, | |
| max_threads: int, | |
| include_orphans: bool, | |
| ) -> SessionIndexBuild: | |
| """Build a latest-first session index and return the restored session ids.""" | |
| candidates: list[tuple[datetime, str, dict[str, str]]] = [] | |
| used_thread_rows = bool(thread_rows) | |
| skipped_orphans = 0 | |
| if used_thread_rows: | |
| session_ids = set(thread_rows) | |
| orphan_ids = set(scans) - set(thread_rows) | |
| if include_orphans: | |
| session_ids |= orphan_ids | |
| else: | |
| skipped_orphans = len(orphan_ids) | |
| else: | |
| session_ids = set(scans) | |
| for session_id in session_ids: | |
| thread = thread_rows.get(session_id) | |
| scan = scans.get(session_id) | |
| if thread and thread.archived: | |
| continue | |
| title = "" | |
| if thread and thread.title: | |
| title = thread.title | |
| elif thread and thread.first_user_message: | |
| title = first_line(thread.first_user_message) | |
| elif scan and scan.first_user_message: | |
| title = first_line(scan.first_user_message) | |
| if not title: | |
| title = session_id | |
| updated_dt: datetime | None = None | |
| updated_text: str | None = None | |
| if scan and scan.updated_at is not None: | |
| updated_dt = scan.updated_at | |
| updated_text = scan.updated_at_raw | |
| if thread and thread.updated_at_ms is not None: | |
| thread_dt = datetime.fromtimestamp( | |
| thread.updated_at_ms / 1000, tz=timezone.utc | |
| ) | |
| if updated_dt is None or thread_dt > updated_dt: | |
| updated_dt = thread_dt | |
| updated_text = format_iso_from_ms(thread.updated_at_ms) | |
| if updated_dt is None: | |
| if thread and thread.created_at_ms is not None: | |
| updated_dt = datetime.fromtimestamp( | |
| thread.created_at_ms / 1000, tz=timezone.utc | |
| ) | |
| updated_text = format_iso_from_ms(thread.created_at_ms) | |
| elif scan and scan.created_at is not None: | |
| updated_dt = scan.created_at | |
| updated_text = format_iso_from_dt(scan.created_at) | |
| if updated_dt is None or updated_text is None: | |
| continue | |
| candidates.append( | |
| ( | |
| updated_dt, | |
| session_id, | |
| { | |
| "id": session_id, | |
| "thread_name": title, | |
| "updated_at": updated_text, | |
| }, | |
| ) | |
| ) | |
| candidates.sort(key=lambda item: (item[0], item[1]), reverse=True) | |
| if max_threads > 0: | |
| candidates = candidates[:max_threads] | |
| return SessionIndexBuild( | |
| entries=[entry for _, _, entry in candidates], | |
| restored_session_ids={session_id for _, session_id, _ in candidates}, | |
| skipped_orphans=skipped_orphans, | |
| used_thread_rows=used_thread_rows, | |
| ) | |
| def build_history_entries( | |
| session_files: Iterable[Path], | |
| *, | |
| allowed_session_ids: set[str] | None, | |
| ) -> list[dict[str, object]]: | |
| """Build history entries, optionally limited to the restored conversation set.""" | |
| entries: list[tuple[int, str, str]] = [] | |
| seen: set[tuple[str, int, str]] = set() | |
| for path in session_files: | |
| _, history_items = scan_session_file(path) | |
| for entry in history_items: | |
| session_id = str(entry["session_id"]) | |
| if ( | |
| allowed_session_ids is not None | |
| and session_id not in allowed_session_ids | |
| ): | |
| continue | |
| key = (session_id, int(entry["ts"]), str(entry["text"])) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| entries.append(key) | |
| entries.sort(key=lambda item: (item[1], item[0], item[2])) | |
| return [ | |
| {"session_id": session_id, "ts": ts, "text": text} | |
| for session_id, ts, text in entries | |
| ] | |
| def trim_history_entries_to_max_bytes( | |
| entries: list[dict[str, object]], | |
| max_bytes: int | None, | |
| ) -> list[dict[str, object]]: | |
| """Mirror Codex history trimming by dropping oldest lines to the soft cap.""" | |
| if not entries or max_bytes is None or max_bytes <= 0: | |
| return entries | |
| lines = [ | |
| json.dumps(entry, ensure_ascii=False, separators=(",", ":")) + "\n" | |
| for entry in entries | |
| ] | |
| line_lengths = [len(line.encode("utf-8")) for line in lines] | |
| current_len = sum(line_lengths) | |
| if current_len <= max_bytes: | |
| return entries | |
| newest_entry_len = line_lengths[-1] | |
| soft_cap_bytes = int(max_bytes * HISTORY_SOFT_CAP_RATIO) | |
| soft_cap_bytes = max(1, min(max_bytes, soft_cap_bytes)) | |
| trim_target = max(soft_cap_bytes, newest_entry_len) | |
| drop_count = 0 | |
| while current_len > trim_target and drop_count < len(entries) - 1: | |
| current_len -= line_lengths[drop_count] | |
| drop_count += 1 | |
| return entries[drop_count:] | |
| def write_jsonl( | |
| path: Path, entries: list[dict[str, object]], backup: bool | |
| ) -> Path | None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| backup_path: Path | None = None | |
| if backup and path.exists(): | |
| stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | |
| backup_path = path.with_name(f"{path.name}.bak.{stamp}") | |
| shutil.copy2(path, backup_path) | |
| with tempfile.NamedTemporaryFile( | |
| "w", | |
| encoding="utf-8", | |
| newline="\n", | |
| delete=False, | |
| dir=str(path.parent), | |
| prefix=f".{path.name}.", | |
| suffix=".tmp", | |
| ) as handle: | |
| for entry in entries: | |
| handle.write(json.dumps(entry, ensure_ascii=False, separators=(",", ":"))) | |
| handle.write("\n") | |
| temp_name = handle.name | |
| Path(temp_name).replace(path) | |
| return backup_path | |
| def main() -> int: | |
| args = parse_args() | |
| output_lines: list[str] = [] | |
| try: | |
| codex_home = resolve_codex_home(args.codex_home) | |
| sessions_root = codex_home / "sessions" | |
| session_index_path = codex_home / "session_index.jsonl" | |
| history_path = codex_home / "history.jsonl" | |
| state_db = codex_home / "state_5.sqlite" | |
| session_files = list(iter_session_files(sessions_root)) | |
| scans: dict[str, SessionScan] = {} | |
| for session_file in session_files: | |
| scan, _ = scan_session_file(session_file) | |
| if scan is not None: | |
| scans[scan.session_id] = scan | |
| thread_rows = load_thread_rows(state_db) | |
| session_index = build_session_index( | |
| thread_rows, | |
| scans, | |
| max_threads=args.max_threads, | |
| include_orphans=args.include_orphans, | |
| ) | |
| session_index_entries = session_index.entries | |
| output_lines.append(f"conversations refreshed: {len(session_index_entries)}") | |
| if session_index.used_thread_rows: | |
| output_lines.append("source: active threads from state_5.sqlite") | |
| else: | |
| output_lines.append( | |
| "source: sessions/ fallback (state_5.sqlite missing or empty)" | |
| ) | |
| if session_index.used_thread_rows and not args.include_orphans: | |
| output_lines.append( | |
| f"orphan sessions skipped: {session_index.skipped_orphans}" | |
| ) | |
| if args.max_threads == 0: | |
| output_lines.append("max conversations: unlimited") | |
| else: | |
| output_lines.append(f"max conversations: {args.max_threads}") | |
| if args.only in {"session-index", "both"}: | |
| if args.dry_run: | |
| output_lines.append(f"file: would update {session_index_path}") | |
| else: | |
| backup_path = write_jsonl( | |
| session_index_path, | |
| session_index_entries, | |
| backup=not args.no_backup, | |
| ) | |
| output_lines.append(f"file: updated {session_index_path}") | |
| if backup_path is not None: | |
| output_lines.append(f"file: created {backup_path}") | |
| if args.only in {"history", "both"}: | |
| history_entries = build_history_entries( | |
| session_files, | |
| allowed_session_ids=session_index.restored_session_ids, | |
| ) | |
| history_max_bytes = None | |
| if not args.ignore_history_max_bytes: | |
| history_max_bytes = load_history_max_bytes(codex_home) | |
| history_entries = trim_history_entries_to_max_bytes( | |
| history_entries, | |
| history_max_bytes, | |
| ) | |
| if history_max_bytes is not None: | |
| output_lines.append(f"history max bytes: applied {history_max_bytes}") | |
| elif args.ignore_history_max_bytes: | |
| output_lines.append("history max bytes: ignored by flag") | |
| else: | |
| output_lines.append("history max bytes: not configured") | |
| if args.dry_run: | |
| output_lines.append(f"file: would update {history_path}") | |
| else: | |
| backup_path = write_jsonl( | |
| history_path, | |
| history_entries, | |
| backup=not args.no_backup, | |
| ) | |
| output_lines.append(f"file: updated {history_path}") | |
| if backup_path is not None: | |
| output_lines.append(f"file: created {backup_path}") | |
| except Exception as exc: | |
| print("status: failed") | |
| print(f"error: {type(exc).__name__}: {exc}") | |
| traceback.print_exc() | |
| return 1 | |
| for line in output_lines: | |
| print(line) | |
| print("status: success") | |
| if not args.dry_run: | |
| print("next step: restart Codex") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment