Created
October 6, 2025 13:47
-
-
Save well-it-wasnt-me/821dbd53787cdc6f180009ac3e895eeb to your computer and use it in GitHub Desktop.
Resource-Aware Markov Chain Indexer (PoC)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Resource-Aware Markov Chain Indexer (PoC) | |
| Attempt | |
| ----- | |
| Index a very large corpus (up to ~25 TB) of heterogeneous documents (pdf, docx, txt, images) | |
| by building an n-gram Markov chain (order=2 or 3) over tokenized text in a *disk-backed* way. | |
| Design Principles | |
| ----------------- | |
| - **Streaming extraction**: never load whole files in memory; read in chunks, yield tokens. | |
| - **Backed by SQLite**: disk-based counts (WAL mode) so RAM stays small; resumable. | |
| - **Id-mapping for tokens**: deduplicate token strings -> integer ids to shrink DB size. | |
| - **Batching + backpressure**: commit every N updates; bounded worker pool. | |
| - **Pluggable extractors**: try to use optional libs (pdfminer, docx, pytesseract) if present; otherwise skip gracefully. | |
| - **Idempotent**: tracks processed files by content hash + mtime; safe to resume. | |
| - **Configurable**: order, workers, batch size, file-globs, and memory-friendly defaults. | |
| This is a proof-of-concept meant as a skeleton you can scale out (e.g., swap SQLite for | |
| RocksDB/LMDB, shard per prefix hash, or push into a distributed map-reduce pipeline). | |
| Usage | |
| ----- | |
| python index_markov.py \ | |
| --root /data/corpus \ | |
| --order 2 \ | |
| --db /data/indexes/markov.sqlite \ | |
| --max-workers 4 \ | |
| --batch 5000 \ | |
| --include "*.txt" --include "*.pdf" --include "*.docx" --include "*.png" --include "*.jpg" | |
| Querying (simple next-token probabilities): | |
| python index_markov.py --db /data/indexes/markov.sqlite --query "hello world" | |
| Notes | |
| ----- | |
| - OCR for images requires Tesseract + pytesseract if you choose to enable it. | |
| - PDF/Word extraction requires pdfminer.six and python-docx respectively if you want richer input. | |
| - For extreme scale, consider turning on token hashing (see TOKEN_HASHING) and/or moving | |
| to a KV store with prefix compression. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import logging | |
| import os | |
| import re | |
| import sqlite3 | |
| import sys | |
| import time | |
| from collections import deque | |
| from pathlib import Path | |
| from typing import Iterable, Iterator, List, Sequence, Tuple | |
| # Optional imports (soft dependencies). We gate their usage at runtime. | |
| try: | |
| from pdfminer.high_level import extract_text as pdf_extract_text # type: ignore | |
| except Exception: # pragma: no cover | |
| pdf_extract_text = None | |
| try: | |
| import docx # python-docx # type: ignore | |
| except Exception: # pragma: no cover | |
| docx = None | |
| try: | |
| from PIL import Image # type: ignore | |
| import pytesseract # type: ignore | |
| except Exception: # pragma: no cover | |
| Image = None | |
| pytesseract = None | |
| # ---------------------------------------------------------------------------- | |
| # Configuration | |
| # ---------------------------------------------------------------------------- | |
| TOKEN_RE = re.compile(r"[\w\-']+", re.UNICODE) | |
| LOWERCASE = True | |
| TOKEN_HASHING = False # If True, store token hashes (sha1 hex) instead of raw tokens to reduce DB size. | |
| DEFAULT_INCLUDE = ["*.txt", "*.md", "*.pdf", "*.docx", "*.rtf", "*.html", "*.htm", "*.png", "*.jpg", "*.jpeg", "*.tiff"] | |
| TEXT_EXT = {".txt", ".md", ".rtf", ".html", ".htm"} | |
| PDF_EXT = {".pdf"} | |
| DOCX_EXT = {".docx"} | |
| IMG_EXT = {".png", ".jpg", ".jpeg", ".tiff"} | |
| # ---------------------------------------------------------------------------- | |
| # DB Schema & Helpers | |
| # ---------------------------------------------------------------------------- | |
| SCHEMA = r""" | |
| PRAGMA journal_mode=WAL; | |
| PRAGMA synchronous=NORMAL; | |
| PRAGMA temp_store=MEMORY; | |
| CREATE TABLE IF NOT EXISTS meta ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT | |
| ); | |
| -- Token dictionary shrinks storage by mapping token -> id | |
| CREATE TABLE IF NOT EXISTS tokens ( | |
| id INTEGER PRIMARY KEY, | |
| token TEXT UNIQUE | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_tokens_token ON tokens(token); | |
| -- N-gram counts: prefix is a space-separated list of token ids (for order k, prefix has k-1 ids) | |
| CREATE TABLE IF NOT EXISTS ngrams ( | |
| prefix TEXT NOT NULL, | |
| next_id INTEGER NOT NULL, | |
| count INTEGER NOT NULL, | |
| PRIMARY KEY(prefix, next_id) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_ngrams_prefix ON ngrams(prefix); | |
| -- Track processed files for idempotency | |
| CREATE TABLE IF NOT EXISTS files ( | |
| path TEXT PRIMARY KEY, | |
| size INTEGER, | |
| mtime REAL, | |
| digest TEXT | |
| ); | |
| """ | |
| class MarkovDB: | |
| def __init__(self, db_path: Path): | |
| self.db_path = Path(db_path) | |
| self.conn = sqlite3.connect(str(self.db_path)) | |
| self.conn.execute("PRAGMA journal_mode=WAL;") | |
| self.conn.execute("PRAGMA synchronous=NORMAL;") | |
| self.conn.execute("PRAGMA temp_store=MEMORY;") | |
| self.conn.executescript(SCHEMA) | |
| self.token_cache: dict[str, int] = {} | |
| self.token_cache_hits = 0 | |
| self.token_cache_misses = 0 | |
| self.cache_cap = 200_000 # keep modest to bound RAM | |
| self._init_meta() | |
| def _init_meta(self): | |
| cur = self.conn.cursor() | |
| cur.execute("INSERT OR IGNORE INTO meta(key, value) VALUES('schema_version','1')") | |
| self.conn.commit() | |
| def close(self): | |
| self.conn.commit() | |
| self.conn.close() | |
| def is_processed(self, path: Path, size: int, mtime: float, digest: str) -> bool: | |
| cur = self.conn.cursor() | |
| cur.execute("SELECT 1 FROM files WHERE path=? AND size=? AND mtime=? AND digest=?", (str(path), size, mtime, digest)) | |
| return cur.fetchone() is not None | |
| def mark_processed(self, path: Path, size: int, mtime: float, digest: str): | |
| self.conn.execute( | |
| "INSERT OR REPLACE INTO files(path,size,mtime,digest) VALUES(?,?,?,?)", | |
| (str(path), size, mtime, digest), | |
| ) | |
| def _maybe_evict_cache(self): | |
| if len(self.token_cache) > self.cache_cap: | |
| self.token_cache.clear() | |
| def token_id(self, token: str) -> int: | |
| """Return integer id for token using a read-then-insert pattern. | |
| Avoids edge-cases with INSERT OR IGNORE + lastrowid and ensures | |
| we never reference an uninitialized variable in finally blocks.""" | |
| if TOKEN_HASHING: | |
| token = hashlib.sha1(token.encode('utf-8')).hexdigest() | |
| cached = self.token_cache.get(token) | |
| if cached is not None: | |
| self.token_cache_hits += 1 | |
| return cached | |
| self.token_cache_misses += 1 | |
| cur = self.conn.cursor() | |
| # Try read first | |
| cur.execute("SELECT id FROM tokens WHERE token=?", (token,)) | |
| row = cur.fetchone() | |
| if row: | |
| tid = row[0] | |
| else: | |
| cur.execute("INSERT INTO tokens(token) VALUES(?)", (token,)) | |
| tid = cur.lastrowid | |
| self._maybe_evict_cache() | |
| self.token_cache[token] = tid | |
| # return tid | |
| self.token_cache_misses += 1 | |
| cur = self.conn.cursor() | |
| try: | |
| cur.execute("INSERT OR IGNORE INTO tokens(token) VALUES(?)", (token,)) | |
| if cur.lastrowid: | |
| tid = cur.lastrowid | |
| else: | |
| cur.execute("SELECT id FROM tokens WHERE token=?", (token,)) | |
| row = cur.fetchone() | |
| assert row is not None | |
| tid = row[0] | |
| finally: | |
| self._maybe_evict_cache() | |
| self.token_cache[token] = tid | |
| return tid | |
| def increment_ngrams(self, ngram_updates: List[Tuple[str, int, int]]): | |
| """Batch upserts: list of (prefix_str, next_id, count).""" | |
| # SQLite UPSERT to minimize round trips | |
| self.conn.executemany( | |
| """ | |
| INSERT INTO ngrams(prefix,next_id,count) VALUES(?,?,?) | |
| ON CONFLICT(prefix,next_id) DO UPDATE SET count = count + excluded.count | |
| """, | |
| ngram_updates, | |
| ) | |
| # ---------------------------------------------------------------------------- | |
| # Text extraction (streaming-ish where possible) | |
| # ---------------------------------------------------------------------------- | |
| def iter_text_from_path(path: Path) -> Iterator[str]: | |
| ext = path.suffix.lower() | |
| if ext in TEXT_EXT: | |
| with open(path, 'r', encoding='utf-8', errors='ignore') as f: | |
| for line in f: | |
| yield line | |
| elif ext in PDF_EXT and pdf_extract_text is not None: | |
| # pdfminer extracts whole document; to keep memory low, stream via StringIO chunks | |
| try: | |
| text = pdf_extract_text(str(path)) # returns a str | |
| # Yield in reasonably small chunks | |
| for i in range(0, len(text), 8192): | |
| yield text[i:i+8192] | |
| except Exception as e: | |
| logging.warning("PDF extract failed for %s: %s", path, e) | |
| elif ext in DOCX_EXT and docx is not None: | |
| try: | |
| doc = docx.Document(str(path)) | |
| for para in doc.paragraphs: | |
| yield para.text + "\n" | |
| except Exception as e: | |
| logging.warning("DOCX extract failed for %s: %s", path, e) | |
| elif ext in IMG_EXT and Image is not None and pytesseract is not None: | |
| try: | |
| with Image.open(path) as im: | |
| # For very large images, consider downscaling to manage OCR time/memory | |
| im = im.convert('L') # grayscale helps OCR memory | |
| text = pytesseract.image_to_string(im) | |
| for i in range(0, len(text), 8192): | |
| yield text[i:i+8192] | |
| except Exception as e: | |
| logging.warning("OCR failed for %s: %s", path, e) | |
| else: | |
| # Unsupported type or missing dependencies | |
| return | |
| def iter_tokens_from_path(path: Path) -> Iterator[str]: | |
| for chunk in iter_text_from_path(path): | |
| if LOWERCASE: | |
| chunk = chunk.lower() | |
| for match in TOKEN_RE.finditer(chunk): | |
| yield match.group(0) | |
| def file_digest(path: Path, algo: str = 'sha1', block_size: int = 1024 * 1024) -> str: | |
| h = hashlib.new(algo) | |
| with open(path, 'rb') as f: | |
| while True: | |
| b = f.read(block_size) | |
| if not b: | |
| break | |
| h.update(b) | |
| return h.hexdigest() | |
| # ---------------------------------------------------------------------------- | |
| # N-gram streaming builder | |
| # ---------------------------------------------------------------------------- | |
| def iter_ngrams(tokens: Iterable[int], order: int) -> Iterator[Tuple[str, int]]: | |
| """Yield (prefix_str, next_id) pairs for an n-gram model of given order. | |
| For order=2 (bigram), prefix is the previous token id; for order=3, the two | |
| previous ids. Uses a fixed-size deque to bound memory. | |
| """ | |
| if order < 2: | |
| raise ValueError("order must be >= 2") | |
| window = deque(maxlen=order) | |
| for tid in tokens: | |
| window.append(tid) | |
| if len(window) == order: | |
| prefix_ids = list(window)[:-1] | |
| next_id = window[-1] | |
| prefix_str = ' '.join(map(str, prefix_ids)) | |
| yield (prefix_str, next_id) | |
| # ---------------------------------------------------------------------------- | |
| # Indexer pipeline | |
| # ---------------------------------------------------------------------------- | |
| def process_file(db: MarkovDB, path: Path, order: int, batch: int) -> int: | |
| """Process a single file; returns number of n-gram updates written.""" | |
| try: | |
| stat = path.stat() | |
| except FileNotFoundError: | |
| return 0 | |
| digest = file_digest(path) | |
| if db.is_processed(path, stat.st_size, stat.st_mtime, digest): | |
| logging.debug("Skip (already processed): %s", path) | |
| return 0 | |
| # Stream tokens -> ids | |
| updates: List[Tuple[str, int, int]] = [] | |
| def flush(): | |
| nonlocal updates, wrote | |
| if updates: | |
| db.increment_ngrams(updates) | |
| wrote += len(updates) | |
| updates = [] | |
| token_ids_iter = (db.token_id(tok) for tok in iter_tokens_from_path(path)) | |
| wrote = 0 | |
| for prefix_str, next_id in iter_ngrams(token_ids_iter, order=order): | |
| updates.append((prefix_str, next_id, 1)) | |
| if len(updates) >= batch: | |
| flush() | |
| flush() | |
| db.mark_processed(path, stat.st_size, stat.st_mtime, digest) | |
| return wrote | |
| def discover_files(root: Path, includes: Sequence[str]) -> Iterator[Path]: | |
| # Using glob patterns per directory walk; avoids listing entire tree into RAM | |
| for dirpath, dirnames, filenames in os.walk(root): | |
| dp = Path(dirpath) | |
| for pat in includes: | |
| for p in dp.glob(pat): | |
| if p.is_file(): | |
| yield p | |
| # ---------------------------------------------------------------------------- | |
| # Query utilities | |
| # ---------------------------------------------------------------------------- | |
| def next_token_probs(db: MarkovDB, prefix_tokens: List[str], topk: int = 10) -> List[Tuple[str, int]]: | |
| # Map tokens -> ids, form prefix key | |
| prefix_ids = [db.token_id(tok.lower() if LOWERCASE else tok) for tok in prefix_tokens] | |
| prefix_key = ' '.join(map(str, prefix_ids[-(len(prefix_ids)):] )) | |
| cur = db.conn.cursor() | |
| cur.execute("SELECT next_id, count FROM ngrams WHERE prefix=? ORDER BY count DESC LIMIT ?", (prefix_key, topk)) | |
| rows = cur.fetchall() | |
| if not rows: | |
| return [] | |
| # Map ids back to tokens (slow but fine for PoC). Consider caching for prod. | |
| out: List[Tuple[str, int]] = [] | |
| for tid, cnt in rows: | |
| cur.execute("SELECT token FROM tokens WHERE id=?", (tid,)) | |
| tok = cur.fetchone()[0] | |
| out.append((tok, cnt)) | |
| return out | |
| # ---------------------------------------------------------------------------- | |
| # CLI | |
| # ---------------------------------------------------------------------------- | |
| def main() -> int: | |
| ap = argparse.ArgumentParser(description="Resource-aware Markov chain indexer (PoC)") | |
| ap.add_argument('--root', type=Path, help='Root directory of corpus') | |
| ap.add_argument('--db', type=Path, required=True, help='SQLite database path') | |
| ap.add_argument('--order', type=int, default=2, choices=[2,3], help='Markov order (2=bigram, 3=trigram)') | |
| ap.add_argument('--include', action='append', default=DEFAULT_INCLUDE, help='Glob(s) to include; can repeat') | |
| ap.add_argument('--max-workers', type=int, default=min(4, os.cpu_count() or 2), help='Parallel file workers (bounded)') | |
| ap.add_argument('--batch', type=int, default=5000, help='N-gram upsert batch size') | |
| ap.add_argument('--log', default='INFO', help='Logging level') | |
| ap.add_argument('--query', type=str, help='Query next-token probs for a given prefix string') | |
| args = ap.parse_args() | |
| logging.basicConfig(level=getattr(logging, args.log.upper(), logging.INFO), format='[%(asctime)s] %(levelname)s: %(message)s') | |
| db = MarkovDB(args.db) | |
| if args.query: | |
| prefix = [t for t in TOKEN_RE.findall(args.query.lower() if LOWERCASE else args.query)] | |
| res = next_token_probs(db, prefix) | |
| if not res: | |
| print("No results for prefix:", ' '.join(prefix)) | |
| else: | |
| print("Next-token candidates for prefix:", ' '.join(prefix)) | |
| for tok, cnt in res: | |
| print(f" {tok}\t{cnt}") | |
| return 0 | |
| if not args.root: | |
| print("--root is required for indexing mode", file=sys.stderr) | |
| return 2 | |
| root = args.root | |
| # Iterate with a small, bounded worker pool | |
| files = discover_files(root, args.include) | |
| total_updates = 0 | |
| processed = 0 | |
| start = time.time() | |
| # Use a single DB connection (thread-confined) for upserts to reduce lock contention. | |
| # Files are processed sequentially in the main thread to keep PoC simple and resource-lean. | |
| # For higher throughput, you can tokenize in workers and send n-gram updates through a Queue to a single DB-writer thread. | |
| for i, path in enumerate(files, 1): | |
| try: | |
| wrote = process_file(db, path, order=args.order, batch=args.batch) | |
| total_updates += wrote | |
| processed += 1 | |
| if processed % 100 == 0: | |
| elapsed = time.time() - start | |
| logging.info("Processed %d files | ~%dK updates | %.1f files/sec", processed, total_updates//1000, processed/max(elapsed,1.0)) | |
| except KeyboardInterrupt: | |
| logging.warning("Interrupted by user.") | |
| break | |
| except Exception as e: | |
| logging.exception("Error on %s: %s", path, e) | |
| continue | |
| db.close() | |
| elapsed = time.time() - start | |
| logging.info("Done. Files=%d, Updates~%dK, Elapsed=%.1fs", processed, total_updates//1000, elapsed) | |
| return 0 | |
| if __name__ == '__main__': | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment