Skip to content

Instantly share code, notes, and snippets.

@well-it-wasnt-me
Created October 6, 2025 13:47
Show Gist options
  • Select an option

  • Save well-it-wasnt-me/821dbd53787cdc6f180009ac3e895eeb to your computer and use it in GitHub Desktop.

Select an option

Save well-it-wasnt-me/821dbd53787cdc6f180009ac3e895eeb to your computer and use it in GitHub Desktop.
Resource-Aware Markov Chain Indexer (PoC)
#!/usr/bin/env python3
"""
Resource-Aware Markov Chain Indexer (PoC)
Attempt
-----
Index a very large corpus (up to ~25 TB) of heterogeneous documents (pdf, docx, txt, images)
by building an n-gram Markov chain (order=2 or 3) over tokenized text in a *disk-backed* way.
Design Principles
-----------------
- **Streaming extraction**: never load whole files in memory; read in chunks, yield tokens.
- **Backed by SQLite**: disk-based counts (WAL mode) so RAM stays small; resumable.
- **Id-mapping for tokens**: deduplicate token strings -> integer ids to shrink DB size.
- **Batching + backpressure**: commit every N updates; bounded worker pool.
- **Pluggable extractors**: try to use optional libs (pdfminer, docx, pytesseract) if present; otherwise skip gracefully.
- **Idempotent**: tracks processed files by content hash + mtime; safe to resume.
- **Configurable**: order, workers, batch size, file-globs, and memory-friendly defaults.
This is a proof-of-concept meant as a skeleton you can scale out (e.g., swap SQLite for
RocksDB/LMDB, shard per prefix hash, or push into a distributed map-reduce pipeline).
Usage
-----
python index_markov.py \
--root /data/corpus \
--order 2 \
--db /data/indexes/markov.sqlite \
--max-workers 4 \
--batch 5000 \
--include "*.txt" --include "*.pdf" --include "*.docx" --include "*.png" --include "*.jpg"
Querying (simple next-token probabilities):
python index_markov.py --db /data/indexes/markov.sqlite --query "hello world"
Notes
-----
- OCR for images requires Tesseract + pytesseract if you choose to enable it.
- PDF/Word extraction requires pdfminer.six and python-docx respectively if you want richer input.
- For extreme scale, consider turning on token hashing (see TOKEN_HASHING) and/or moving
to a KV store with prefix compression.
"""
from __future__ import annotations
import argparse
import hashlib
import logging
import os
import re
import sqlite3
import sys
import time
from collections import deque
from pathlib import Path
from typing import Iterable, Iterator, List, Sequence, Tuple
# Optional imports (soft dependencies). We gate their usage at runtime.
try:
from pdfminer.high_level import extract_text as pdf_extract_text # type: ignore
except Exception: # pragma: no cover
pdf_extract_text = None
try:
import docx # python-docx # type: ignore
except Exception: # pragma: no cover
docx = None
try:
from PIL import Image # type: ignore
import pytesseract # type: ignore
except Exception: # pragma: no cover
Image = None
pytesseract = None
# ----------------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------------
TOKEN_RE = re.compile(r"[\w\-']+", re.UNICODE)
LOWERCASE = True
TOKEN_HASHING = False # If True, store token hashes (sha1 hex) instead of raw tokens to reduce DB size.
DEFAULT_INCLUDE = ["*.txt", "*.md", "*.pdf", "*.docx", "*.rtf", "*.html", "*.htm", "*.png", "*.jpg", "*.jpeg", "*.tiff"]
TEXT_EXT = {".txt", ".md", ".rtf", ".html", ".htm"}
PDF_EXT = {".pdf"}
DOCX_EXT = {".docx"}
IMG_EXT = {".png", ".jpg", ".jpeg", ".tiff"}
# ----------------------------------------------------------------------------
# DB Schema & Helpers
# ----------------------------------------------------------------------------
SCHEMA = r"""
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA temp_store=MEMORY;
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT
);
-- Token dictionary shrinks storage by mapping token -> id
CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY,
token TEXT UNIQUE
);
CREATE INDEX IF NOT EXISTS idx_tokens_token ON tokens(token);
-- N-gram counts: prefix is a space-separated list of token ids (for order k, prefix has k-1 ids)
CREATE TABLE IF NOT EXISTS ngrams (
prefix TEXT NOT NULL,
next_id INTEGER NOT NULL,
count INTEGER NOT NULL,
PRIMARY KEY(prefix, next_id)
);
CREATE INDEX IF NOT EXISTS idx_ngrams_prefix ON ngrams(prefix);
-- Track processed files for idempotency
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
size INTEGER,
mtime REAL,
digest TEXT
);
"""
class MarkovDB:
def __init__(self, db_path: Path):
self.db_path = Path(db_path)
self.conn = sqlite3.connect(str(self.db_path))
self.conn.execute("PRAGMA journal_mode=WAL;")
self.conn.execute("PRAGMA synchronous=NORMAL;")
self.conn.execute("PRAGMA temp_store=MEMORY;")
self.conn.executescript(SCHEMA)
self.token_cache: dict[str, int] = {}
self.token_cache_hits = 0
self.token_cache_misses = 0
self.cache_cap = 200_000 # keep modest to bound RAM
self._init_meta()
def _init_meta(self):
cur = self.conn.cursor()
cur.execute("INSERT OR IGNORE INTO meta(key, value) VALUES('schema_version','1')")
self.conn.commit()
def close(self):
self.conn.commit()
self.conn.close()
def is_processed(self, path: Path, size: int, mtime: float, digest: str) -> bool:
cur = self.conn.cursor()
cur.execute("SELECT 1 FROM files WHERE path=? AND size=? AND mtime=? AND digest=?", (str(path), size, mtime, digest))
return cur.fetchone() is not None
def mark_processed(self, path: Path, size: int, mtime: float, digest: str):
self.conn.execute(
"INSERT OR REPLACE INTO files(path,size,mtime,digest) VALUES(?,?,?,?)",
(str(path), size, mtime, digest),
)
def _maybe_evict_cache(self):
if len(self.token_cache) > self.cache_cap:
self.token_cache.clear()
def token_id(self, token: str) -> int:
"""Return integer id for token using a read-then-insert pattern.
Avoids edge-cases with INSERT OR IGNORE + lastrowid and ensures
we never reference an uninitialized variable in finally blocks."""
if TOKEN_HASHING:
token = hashlib.sha1(token.encode('utf-8')).hexdigest()
cached = self.token_cache.get(token)
if cached is not None:
self.token_cache_hits += 1
return cached
self.token_cache_misses += 1
cur = self.conn.cursor()
# Try read first
cur.execute("SELECT id FROM tokens WHERE token=?", (token,))
row = cur.fetchone()
if row:
tid = row[0]
else:
cur.execute("INSERT INTO tokens(token) VALUES(?)", (token,))
tid = cur.lastrowid
self._maybe_evict_cache()
self.token_cache[token] = tid
# return tid
self.token_cache_misses += 1
cur = self.conn.cursor()
try:
cur.execute("INSERT OR IGNORE INTO tokens(token) VALUES(?)", (token,))
if cur.lastrowid:
tid = cur.lastrowid
else:
cur.execute("SELECT id FROM tokens WHERE token=?", (token,))
row = cur.fetchone()
assert row is not None
tid = row[0]
finally:
self._maybe_evict_cache()
self.token_cache[token] = tid
return tid
def increment_ngrams(self, ngram_updates: List[Tuple[str, int, int]]):
"""Batch upserts: list of (prefix_str, next_id, count)."""
# SQLite UPSERT to minimize round trips
self.conn.executemany(
"""
INSERT INTO ngrams(prefix,next_id,count) VALUES(?,?,?)
ON CONFLICT(prefix,next_id) DO UPDATE SET count = count + excluded.count
""",
ngram_updates,
)
# ----------------------------------------------------------------------------
# Text extraction (streaming-ish where possible)
# ----------------------------------------------------------------------------
def iter_text_from_path(path: Path) -> Iterator[str]:
ext = path.suffix.lower()
if ext in TEXT_EXT:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
yield line
elif ext in PDF_EXT and pdf_extract_text is not None:
# pdfminer extracts whole document; to keep memory low, stream via StringIO chunks
try:
text = pdf_extract_text(str(path)) # returns a str
# Yield in reasonably small chunks
for i in range(0, len(text), 8192):
yield text[i:i+8192]
except Exception as e:
logging.warning("PDF extract failed for %s: %s", path, e)
elif ext in DOCX_EXT and docx is not None:
try:
doc = docx.Document(str(path))
for para in doc.paragraphs:
yield para.text + "\n"
except Exception as e:
logging.warning("DOCX extract failed for %s: %s", path, e)
elif ext in IMG_EXT and Image is not None and pytesseract is not None:
try:
with Image.open(path) as im:
# For very large images, consider downscaling to manage OCR time/memory
im = im.convert('L') # grayscale helps OCR memory
text = pytesseract.image_to_string(im)
for i in range(0, len(text), 8192):
yield text[i:i+8192]
except Exception as e:
logging.warning("OCR failed for %s: %s", path, e)
else:
# Unsupported type or missing dependencies
return
def iter_tokens_from_path(path: Path) -> Iterator[str]:
for chunk in iter_text_from_path(path):
if LOWERCASE:
chunk = chunk.lower()
for match in TOKEN_RE.finditer(chunk):
yield match.group(0)
def file_digest(path: Path, algo: str = 'sha1', block_size: int = 1024 * 1024) -> str:
h = hashlib.new(algo)
with open(path, 'rb') as f:
while True:
b = f.read(block_size)
if not b:
break
h.update(b)
return h.hexdigest()
# ----------------------------------------------------------------------------
# N-gram streaming builder
# ----------------------------------------------------------------------------
def iter_ngrams(tokens: Iterable[int], order: int) -> Iterator[Tuple[str, int]]:
"""Yield (prefix_str, next_id) pairs for an n-gram model of given order.
For order=2 (bigram), prefix is the previous token id; for order=3, the two
previous ids. Uses a fixed-size deque to bound memory.
"""
if order < 2:
raise ValueError("order must be >= 2")
window = deque(maxlen=order)
for tid in tokens:
window.append(tid)
if len(window) == order:
prefix_ids = list(window)[:-1]
next_id = window[-1]
prefix_str = ' '.join(map(str, prefix_ids))
yield (prefix_str, next_id)
# ----------------------------------------------------------------------------
# Indexer pipeline
# ----------------------------------------------------------------------------
def process_file(db: MarkovDB, path: Path, order: int, batch: int) -> int:
"""Process a single file; returns number of n-gram updates written."""
try:
stat = path.stat()
except FileNotFoundError:
return 0
digest = file_digest(path)
if db.is_processed(path, stat.st_size, stat.st_mtime, digest):
logging.debug("Skip (already processed): %s", path)
return 0
# Stream tokens -> ids
updates: List[Tuple[str, int, int]] = []
def flush():
nonlocal updates, wrote
if updates:
db.increment_ngrams(updates)
wrote += len(updates)
updates = []
token_ids_iter = (db.token_id(tok) for tok in iter_tokens_from_path(path))
wrote = 0
for prefix_str, next_id in iter_ngrams(token_ids_iter, order=order):
updates.append((prefix_str, next_id, 1))
if len(updates) >= batch:
flush()
flush()
db.mark_processed(path, stat.st_size, stat.st_mtime, digest)
return wrote
def discover_files(root: Path, includes: Sequence[str]) -> Iterator[Path]:
# Using glob patterns per directory walk; avoids listing entire tree into RAM
for dirpath, dirnames, filenames in os.walk(root):
dp = Path(dirpath)
for pat in includes:
for p in dp.glob(pat):
if p.is_file():
yield p
# ----------------------------------------------------------------------------
# Query utilities
# ----------------------------------------------------------------------------
def next_token_probs(db: MarkovDB, prefix_tokens: List[str], topk: int = 10) -> List[Tuple[str, int]]:
# Map tokens -> ids, form prefix key
prefix_ids = [db.token_id(tok.lower() if LOWERCASE else tok) for tok in prefix_tokens]
prefix_key = ' '.join(map(str, prefix_ids[-(len(prefix_ids)):] ))
cur = db.conn.cursor()
cur.execute("SELECT next_id, count FROM ngrams WHERE prefix=? ORDER BY count DESC LIMIT ?", (prefix_key, topk))
rows = cur.fetchall()
if not rows:
return []
# Map ids back to tokens (slow but fine for PoC). Consider caching for prod.
out: List[Tuple[str, int]] = []
for tid, cnt in rows:
cur.execute("SELECT token FROM tokens WHERE id=?", (tid,))
tok = cur.fetchone()[0]
out.append((tok, cnt))
return out
# ----------------------------------------------------------------------------
# CLI
# ----------------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description="Resource-aware Markov chain indexer (PoC)")
ap.add_argument('--root', type=Path, help='Root directory of corpus')
ap.add_argument('--db', type=Path, required=True, help='SQLite database path')
ap.add_argument('--order', type=int, default=2, choices=[2,3], help='Markov order (2=bigram, 3=trigram)')
ap.add_argument('--include', action='append', default=DEFAULT_INCLUDE, help='Glob(s) to include; can repeat')
ap.add_argument('--max-workers', type=int, default=min(4, os.cpu_count() or 2), help='Parallel file workers (bounded)')
ap.add_argument('--batch', type=int, default=5000, help='N-gram upsert batch size')
ap.add_argument('--log', default='INFO', help='Logging level')
ap.add_argument('--query', type=str, help='Query next-token probs for a given prefix string')
args = ap.parse_args()
logging.basicConfig(level=getattr(logging, args.log.upper(), logging.INFO), format='[%(asctime)s] %(levelname)s: %(message)s')
db = MarkovDB(args.db)
if args.query:
prefix = [t for t in TOKEN_RE.findall(args.query.lower() if LOWERCASE else args.query)]
res = next_token_probs(db, prefix)
if not res:
print("No results for prefix:", ' '.join(prefix))
else:
print("Next-token candidates for prefix:", ' '.join(prefix))
for tok, cnt in res:
print(f" {tok}\t{cnt}")
return 0
if not args.root:
print("--root is required for indexing mode", file=sys.stderr)
return 2
root = args.root
# Iterate with a small, bounded worker pool
files = discover_files(root, args.include)
total_updates = 0
processed = 0
start = time.time()
# Use a single DB connection (thread-confined) for upserts to reduce lock contention.
# Files are processed sequentially in the main thread to keep PoC simple and resource-lean.
# For higher throughput, you can tokenize in workers and send n-gram updates through a Queue to a single DB-writer thread.
for i, path in enumerate(files, 1):
try:
wrote = process_file(db, path, order=args.order, batch=args.batch)
total_updates += wrote
processed += 1
if processed % 100 == 0:
elapsed = time.time() - start
logging.info("Processed %d files | ~%dK updates | %.1f files/sec", processed, total_updates//1000, processed/max(elapsed,1.0))
except KeyboardInterrupt:
logging.warning("Interrupted by user.")
break
except Exception as e:
logging.exception("Error on %s: %s", path, e)
continue
db.close()
elapsed = time.time() - start
logging.info("Done. Files=%d, Updates~%dK, Elapsed=%.1fs", processed, total_updates//1000, elapsed)
return 0
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment