Skip to content

Instantly share code, notes, and snippets.

@SolarianZ
Created April 17, 2026 16:00
Show Gist options
  • Select an option

  • Save SolarianZ/db5ae2981f15c97d5c9fe12a11b7d212 to your computer and use it in GitHub Desktop.

Select an option

Save SolarianZ/db5ae2981f15c97d5c9fe12a11b7d212 to your computer and use it in GitHub Desktop.
Rebuild Codex local conversation indexes from rollout files and SQLite state.
#!/usr/bin/env python3
"""Rebuild Codex local conversation indexes from rollout files and SQLite state.
This utility regenerates `session_index.jsonl` and/or `history.jsonl` under
`CODEX_HOME` (defaults to `~/.codex`) from local rollout/session files.
Default behavior is intentionally conservative:
- rebuild the conversation list from non-archived threads in `state_5.sqlite`
- restore only the most recent 100 conversations (`--max-threads 0` means no limit)
- rebuild `history.jsonl` only for the restored conversation set
- exclude orphan session files unless `--include-orphans` is provided
- honor `[history].max_bytes` from `config.toml` unless `--ignore-history-max-bytes` is used
Usage examples:
python refresh_codex_history.py --dry-run
python refresh_codex_history.py --max-threads 200
python refresh_codex_history.py --include-orphans --ignore-history-max-bytes
Main parameters:
--max-threads N
Maximum number of most-recent conversations to restore. Default: 100.
Use 0 to disable the limit.
--include-orphans
Include session files that exist under `sessions/` but do not have a
matching thread row in `state_5.sqlite`.
--only {session-index,history,both}
Choose which output file(s) to rebuild.
--dry-run
Show what would be written without modifying any files.
--ignore-history-max-bytes
Ignore `[history].max_bytes` from `config.toml` while rebuilding
`history.jsonl`.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import sqlite3
import tempfile
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Iterable
try:
import tomllib
except ModuleNotFoundError: # pragma: no cover - fallback for older Python
import tomli as tomllib # type: ignore[no-redef]
HISTORY_SOFT_CAP_RATIO = 0.8
def parse_nonnegative_int(value: str) -> int:
parsed = int(value)
if parsed < 0:
raise argparse.ArgumentTypeError("value must be >= 0")
return parsed
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Rebuild Codex session_index.jsonl and history.jsonl from local state.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent(
"""\
Examples:
python refresh_codex_history.py --dry-run
python refresh_codex_history.py --max-threads 200
python refresh_codex_history.py --only session-index --include-orphans
"""
),
)
parser.add_argument(
"--codex-home",
type=Path,
default=None,
help="Codex home directory. Defaults to CODEX_HOME or ~/.codex.",
)
parser.add_argument(
"--only",
choices=("session-index", "history", "both"),
default="both",
help="Choose which file(s) to rebuild.",
)
parser.add_argument(
"--max-threads",
type=parse_nonnegative_int,
default=100,
help=(
"Maximum number of most recently updated conversations to restore. "
"Use 0 for no limit. Default: 100."
),
)
parser.add_argument(
"--include-orphans",
action="store_true",
help=(
"Include session files found under sessions/ that do not have a "
"matching thread row in state_5.sqlite."
),
)
parser.add_argument(
"--ignore-history-max-bytes",
action="store_true",
help=(
"Ignore [history].max_bytes from config.toml when rebuilding "
"history.jsonl."
),
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be written without modifying files.",
)
parser.add_argument(
"--no-backup",
action="store_true",
help="Do not create timestamped backups before overwriting files.",
)
return parser.parse_args()
def resolve_codex_home(explicit: Path | None) -> Path:
if explicit is not None:
return explicit.expanduser().resolve()
env_home = Path.home().joinpath(".codex")
return Path(os.environ.get("CODEX_HOME", str(env_home))).expanduser().resolve()
def parse_iso8601(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def parse_session_timestamp(value: str | None) -> tuple[datetime | None, str | None]:
if not value:
return None, None
dt = parse_iso8601(value)
return dt, value
def format_iso_from_dt(dt: datetime) -> str:
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def format_iso_from_ms(epoch_ms: int | None) -> str | None:
if epoch_ms is None:
return None
dt = datetime.fromtimestamp(epoch_ms / 1000, tz=timezone.utc)
return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")
def to_epoch_seconds(dt: datetime) -> int:
return int(dt.timestamp())
def sanitize_user_text(text: str | None) -> str | None:
if text is None:
return None
return text.rstrip("\r\n")
def first_line(text: str | None) -> str:
if not text:
return ""
for line in text.splitlines():
stripped = line.strip()
if stripped:
return stripped
return text.strip()
@dataclass
class SessionScan:
session_id: str
rollout_path: Path
created_at: datetime | None = None
updated_at: datetime | None = None
updated_at_raw: str | None = None
first_user_message: str | None = None
source: str | None = None
@dataclass
class ThreadRow:
session_id: str
rollout_path: Path | None
title: str
first_user_message: str
created_at_ms: int | None
updated_at_ms: int | None
archived: bool
@dataclass
class SessionIndexBuild:
entries: list[dict[str, str]]
restored_session_ids: set[str]
skipped_orphans: int
used_thread_rows: bool
def iter_session_files(sessions_root: Path) -> Iterable[Path]:
if not sessions_root.exists():
return []
return sorted(sessions_root.rglob("*.jsonl"))
def scan_session_file(path: Path) -> tuple[SessionScan | None, list[dict[str, object]]]:
session_id: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
updated_at_raw: str | None = None
first_user_message: str | None = None
source: str | None = None
history_entries: list[dict[str, object]] = []
saw_event_user_message = False
try:
with path.open("r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
ts_text = obj.get("timestamp")
if isinstance(ts_text, str):
dt, raw = parse_session_timestamp(ts_text)
if dt is not None:
updated_at = dt
updated_at_raw = raw
entry_type = obj.get("type")
payload = obj.get("payload", {})
if entry_type == "session_meta" and isinstance(payload, dict):
maybe_id = payload.get("id")
if isinstance(maybe_id, str) and maybe_id:
session_id = maybe_id
created_text = payload.get("timestamp")
if isinstance(created_text, str):
dt, _ = parse_session_timestamp(created_text)
if dt is not None:
created_at = dt
maybe_source = payload.get("source")
if isinstance(maybe_source, str) and maybe_source:
source = maybe_source
if entry_type == "event_msg" and isinstance(payload, dict):
event_type = payload.get("type")
if event_type == "user_message":
message = sanitize_user_text(payload.get("message"))
if message:
saw_event_user_message = True
if first_user_message is None:
first_user_message = message
if session_id and updated_at is not None:
history_entries.append(
{
"session_id": session_id,
"ts": to_epoch_seconds(updated_at),
"text": message,
}
)
if (
not saw_event_user_message
and entry_type == "response_item"
and isinstance(payload, dict)
and payload.get("type") == "message"
and payload.get("role") == "user"
):
parts: list[str] = []
for item in payload.get("content", []):
if not isinstance(item, dict):
continue
if item.get("type") in {"input_text", "output_text"}:
text = item.get("text")
if isinstance(text, str) and text:
parts.append(text)
fallback_text = sanitize_user_text("\n".join(parts))
if fallback_text and first_user_message is None:
first_user_message = fallback_text
except OSError:
return None, []
if session_id is None:
return None, history_entries
return (
SessionScan(
session_id=session_id,
rollout_path=path,
created_at=created_at,
updated_at=updated_at,
updated_at_raw=updated_at_raw,
first_user_message=first_user_message,
source=source,
),
history_entries,
)
def load_thread_rows(state_db: Path) -> dict[str, ThreadRow]:
if not state_db.exists():
return {}
rows: dict[str, ThreadRow] = {}
conn = sqlite3.connect(str(state_db))
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
for row in cur.execute(
"""
SELECT
id,
rollout_path,
title,
first_user_message,
created_at_ms,
updated_at_ms,
archived
FROM threads
"""
):
rollout_path = row["rollout_path"]
rows[row["id"]] = ThreadRow(
session_id=row["id"],
rollout_path=Path(rollout_path) if rollout_path else None,
title=row["title"] or "",
first_user_message=row["first_user_message"] or "",
created_at_ms=row["created_at_ms"],
updated_at_ms=row["updated_at_ms"],
archived=bool(row["archived"]),
)
finally:
conn.close()
return rows
def load_history_max_bytes(codex_home: Path) -> int | None:
"""Read `[history].max_bytes` from `config.toml` when available."""
config_path = codex_home / "config.toml"
if not config_path.exists():
return None
try:
config = tomllib.loads(config_path.read_text(encoding="utf-8"))
except (OSError, tomllib.TOMLDecodeError):
return None
history = config.get("history")
if not isinstance(history, dict):
return None
max_bytes = history.get("max_bytes")
if isinstance(max_bytes, int) and max_bytes > 0:
return max_bytes
return None
def build_session_index(
thread_rows: dict[str, ThreadRow],
scans: dict[str, SessionScan],
*,
max_threads: int,
include_orphans: bool,
) -> SessionIndexBuild:
"""Build a latest-first session index and return the restored session ids."""
candidates: list[tuple[datetime, str, dict[str, str]]] = []
used_thread_rows = bool(thread_rows)
skipped_orphans = 0
if used_thread_rows:
session_ids = set(thread_rows)
orphan_ids = set(scans) - set(thread_rows)
if include_orphans:
session_ids |= orphan_ids
else:
skipped_orphans = len(orphan_ids)
else:
session_ids = set(scans)
for session_id in session_ids:
thread = thread_rows.get(session_id)
scan = scans.get(session_id)
if thread and thread.archived:
continue
title = ""
if thread and thread.title:
title = thread.title
elif thread and thread.first_user_message:
title = first_line(thread.first_user_message)
elif scan and scan.first_user_message:
title = first_line(scan.first_user_message)
if not title:
title = session_id
updated_dt: datetime | None = None
updated_text: str | None = None
if scan and scan.updated_at is not None:
updated_dt = scan.updated_at
updated_text = scan.updated_at_raw
if thread and thread.updated_at_ms is not None:
thread_dt = datetime.fromtimestamp(
thread.updated_at_ms / 1000, tz=timezone.utc
)
if updated_dt is None or thread_dt > updated_dt:
updated_dt = thread_dt
updated_text = format_iso_from_ms(thread.updated_at_ms)
if updated_dt is None:
if thread and thread.created_at_ms is not None:
updated_dt = datetime.fromtimestamp(
thread.created_at_ms / 1000, tz=timezone.utc
)
updated_text = format_iso_from_ms(thread.created_at_ms)
elif scan and scan.created_at is not None:
updated_dt = scan.created_at
updated_text = format_iso_from_dt(scan.created_at)
if updated_dt is None or updated_text is None:
continue
candidates.append(
(
updated_dt,
session_id,
{
"id": session_id,
"thread_name": title,
"updated_at": updated_text,
},
)
)
candidates.sort(key=lambda item: (item[0], item[1]), reverse=True)
if max_threads > 0:
candidates = candidates[:max_threads]
return SessionIndexBuild(
entries=[entry for _, _, entry in candidates],
restored_session_ids={session_id for _, session_id, _ in candidates},
skipped_orphans=skipped_orphans,
used_thread_rows=used_thread_rows,
)
def build_history_entries(
session_files: Iterable[Path],
*,
allowed_session_ids: set[str] | None,
) -> list[dict[str, object]]:
"""Build history entries, optionally limited to the restored conversation set."""
entries: list[tuple[int, str, str]] = []
seen: set[tuple[str, int, str]] = set()
for path in session_files:
_, history_items = scan_session_file(path)
for entry in history_items:
session_id = str(entry["session_id"])
if (
allowed_session_ids is not None
and session_id not in allowed_session_ids
):
continue
key = (session_id, int(entry["ts"]), str(entry["text"]))
if key in seen:
continue
seen.add(key)
entries.append(key)
entries.sort(key=lambda item: (item[1], item[0], item[2]))
return [
{"session_id": session_id, "ts": ts, "text": text}
for session_id, ts, text in entries
]
def trim_history_entries_to_max_bytes(
entries: list[dict[str, object]],
max_bytes: int | None,
) -> list[dict[str, object]]:
"""Mirror Codex history trimming by dropping oldest lines to the soft cap."""
if not entries or max_bytes is None or max_bytes <= 0:
return entries
lines = [
json.dumps(entry, ensure_ascii=False, separators=(",", ":")) + "\n"
for entry in entries
]
line_lengths = [len(line.encode("utf-8")) for line in lines]
current_len = sum(line_lengths)
if current_len <= max_bytes:
return entries
newest_entry_len = line_lengths[-1]
soft_cap_bytes = int(max_bytes * HISTORY_SOFT_CAP_RATIO)
soft_cap_bytes = max(1, min(max_bytes, soft_cap_bytes))
trim_target = max(soft_cap_bytes, newest_entry_len)
drop_count = 0
while current_len > trim_target and drop_count < len(entries) - 1:
current_len -= line_lengths[drop_count]
drop_count += 1
return entries[drop_count:]
def write_jsonl(
path: Path, entries: list[dict[str, object]], backup: bool
) -> Path | None:
path.parent.mkdir(parents=True, exist_ok=True)
backup_path: Path | None = None
if backup and path.exists():
stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
backup_path = path.with_name(f"{path.name}.bak.{stamp}")
shutil.copy2(path, backup_path)
with tempfile.NamedTemporaryFile(
"w",
encoding="utf-8",
newline="\n",
delete=False,
dir=str(path.parent),
prefix=f".{path.name}.",
suffix=".tmp",
) as handle:
for entry in entries:
handle.write(json.dumps(entry, ensure_ascii=False, separators=(",", ":")))
handle.write("\n")
temp_name = handle.name
Path(temp_name).replace(path)
return backup_path
def main() -> int:
args = parse_args()
output_lines: list[str] = []
try:
codex_home = resolve_codex_home(args.codex_home)
sessions_root = codex_home / "sessions"
session_index_path = codex_home / "session_index.jsonl"
history_path = codex_home / "history.jsonl"
state_db = codex_home / "state_5.sqlite"
session_files = list(iter_session_files(sessions_root))
scans: dict[str, SessionScan] = {}
for session_file in session_files:
scan, _ = scan_session_file(session_file)
if scan is not None:
scans[scan.session_id] = scan
thread_rows = load_thread_rows(state_db)
session_index = build_session_index(
thread_rows,
scans,
max_threads=args.max_threads,
include_orphans=args.include_orphans,
)
session_index_entries = session_index.entries
output_lines.append(f"conversations refreshed: {len(session_index_entries)}")
if session_index.used_thread_rows:
output_lines.append("source: active threads from state_5.sqlite")
else:
output_lines.append(
"source: sessions/ fallback (state_5.sqlite missing or empty)"
)
if session_index.used_thread_rows and not args.include_orphans:
output_lines.append(
f"orphan sessions skipped: {session_index.skipped_orphans}"
)
if args.max_threads == 0:
output_lines.append("max conversations: unlimited")
else:
output_lines.append(f"max conversations: {args.max_threads}")
if args.only in {"session-index", "both"}:
if args.dry_run:
output_lines.append(f"file: would update {session_index_path}")
else:
backup_path = write_jsonl(
session_index_path,
session_index_entries,
backup=not args.no_backup,
)
output_lines.append(f"file: updated {session_index_path}")
if backup_path is not None:
output_lines.append(f"file: created {backup_path}")
if args.only in {"history", "both"}:
history_entries = build_history_entries(
session_files,
allowed_session_ids=session_index.restored_session_ids,
)
history_max_bytes = None
if not args.ignore_history_max_bytes:
history_max_bytes = load_history_max_bytes(codex_home)
history_entries = trim_history_entries_to_max_bytes(
history_entries,
history_max_bytes,
)
if history_max_bytes is not None:
output_lines.append(f"history max bytes: applied {history_max_bytes}")
elif args.ignore_history_max_bytes:
output_lines.append("history max bytes: ignored by flag")
else:
output_lines.append("history max bytes: not configured")
if args.dry_run:
output_lines.append(f"file: would update {history_path}")
else:
backup_path = write_jsonl(
history_path,
history_entries,
backup=not args.no_backup,
)
output_lines.append(f"file: updated {history_path}")
if backup_path is not None:
output_lines.append(f"file: created {backup_path}")
except Exception as exc:
print("status: failed")
print(f"error: {type(exc).__name__}: {exc}")
traceback.print_exc()
return 1
for line in output_lines:
print(line)
print("status: success")
if not args.dry_run:
print("next step: restart Codex")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment