Last active
February 24, 2026 01:11
-
-
Save sawa2d2/b3fa8960b7fab5b7eeff2b02b7a7a9ef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sqlite3 | |
| import socket | |
| from dataclasses import dataclass | |
| from typing import Optional, Callable | |
| # ---------------------------------------------------------------------- | |
| # Data model | |
| # ---------------------------------------------------------------------- | |
| @dataclass(frozen=True) | |
| class Job: | |
| id: int | |
| payload: str | |
| lease: int | |
| # ---------------------------------------------------------------------- | |
| # Queue implementation | |
| # ---------------------------------------------------------------------- | |
| class SqliteQueue: | |
| """ | |
| Persistent queue implemented on SQLite. | |
| Properties: | |
| - WAL disabled (rollback journal) | |
| - Safe under multi-writer / multi-reader | |
| - Atomic claim | |
| - Fencing via monotonically increasing lease | |
| - Crash-tolerant with reclaim | |
| """ | |
| def __init__( | |
| self, | |
| db_path: str, | |
| *, | |
| node_id: Optional[str] = None, | |
| busy_timeout_ms: int = 5000, | |
| ) -> None: | |
| self.db_path = db_path | |
| self.node_id = node_id or socket.gethostname() | |
| self.busy_timeout_ms = busy_timeout_ms | |
| # ------------------------------------------------------------------ | |
| # Connection / setup | |
| # ------------------------------------------------------------------ | |
| def connect(self) -> sqlite3.Connection: | |
| """ | |
| Open a SQLite connection configured for GPFS safety. | |
| """ | |
| conn = sqlite3.connect( | |
| self.db_path, | |
| timeout=self.busy_timeout_ms / 1000.0, | |
| isolation_level=None, # explicit BEGIN/COMMIT | |
| ) | |
| conn.row_factory = sqlite3.Row | |
| # Safety-first pragmas | |
| conn.execute("PRAGMA journal_mode=DELETE;") # WAL disabled | |
| conn.execute("PRAGMA synchronous=FULL;") # strong durability | |
| conn.execute(f"PRAGMA busy_timeout={self.busy_timeout_ms};") | |
| return conn | |
| def init_schema(self, conn: sqlite3.Connection) -> None: | |
| """ | |
| Initialize database schema. | |
| """ | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS queue ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| payload TEXT NOT NULL, | |
| status TEXT NOT NULL CHECK ( | |
| status IN ('pending', 'processing', 'done', 'failed') | |
| ), | |
| created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
| locked_at DATETIME, | |
| locked_by TEXT, | |
| lease INTEGER NOT NULL DEFAULT 0 | |
| ); | |
| """) | |
| conn.execute( | |
| "CREATE INDEX IF NOT EXISTS idx_queue_status_id ON queue(status, id);" | |
| ) | |
| conn.execute( | |
| "CREATE INDEX IF NOT EXISTS idx_queue_locked_at ON queue(status, locked_at);" | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Producer | |
| # ------------------------------------------------------------------ | |
| def enqueue(self, conn: sqlite3.Connection, payload: str) -> int: | |
| """ | |
| Enqueue a new job. | |
| """ | |
| conn.execute("BEGIN") | |
| try: | |
| cur = conn.execute( | |
| "INSERT INTO queue (payload, status) VALUES (?, 'pending')", | |
| (payload,), | |
| ) | |
| job_id = int(cur.lastrowid) | |
| conn.execute("COMMIT") | |
| return job_id | |
| except Exception: | |
| conn.execute("ROLLBACK") | |
| raise | |
| # ------------------------------------------------------------------ | |
| # Consumer operations | |
| # ------------------------------------------------------------------ | |
| def claim_one(self, conn: sqlite3.Connection) -> Optional[Job]: | |
| """ | |
| Atomically claim one pending job. | |
| Returns: | |
| Job if claimed, None if no pending job exists. | |
| """ | |
| conn.execute("BEGIN IMMEDIATE") | |
| try: | |
| row = conn.execute(""" | |
| UPDATE queue | |
| SET status = 'processing', | |
| locked_at = CURRENT_TIMESTAMP, | |
| locked_by = ?, | |
| lease = lease + 1 | |
| WHERE id = ( | |
| SELECT id | |
| FROM queue | |
| WHERE status = 'pending' | |
| ORDER BY id | |
| LIMIT 1 | |
| ) | |
| RETURNING id, payload, lease; | |
| """, (self.node_id,)).fetchone() | |
| conn.execute("COMMIT") | |
| if row is None: | |
| return None | |
| return Job( | |
| id=int(row["id"]), | |
| payload=str(row["payload"]), | |
| lease=int(row["lease"]), | |
| ) | |
| except Exception: | |
| conn.execute("ROLLBACK") | |
| raise | |
| def ack(self, conn: sqlite3.Connection, job: Job) -> bool: | |
| """ | |
| Mark a job as successfully processed. | |
| Lease is checked to fence stale owners. | |
| """ | |
| conn.execute("BEGIN") | |
| try: | |
| cur = conn.execute(""" | |
| UPDATE queue | |
| SET status = 'done' | |
| WHERE id = ? | |
| AND status = 'processing' | |
| AND lease = ?; | |
| """, (job.id, job.lease)) | |
| conn.execute("COMMIT") | |
| return cur.rowcount == 1 | |
| except Exception: | |
| conn.execute("ROLLBACK") | |
| raise | |
| def fail(self, conn: sqlite3.Connection, job: Job, *, retry: bool = True) -> None: | |
| """ | |
| Mark a job as failed or return it to pending. | |
| retry=True -> status becomes 'pending' | |
| retry=False -> status becomes 'failed' | |
| """ | |
| new_status = "pending" if retry else "failed" | |
| conn.execute("BEGIN") | |
| try: | |
| conn.execute(f""" | |
| UPDATE queue | |
| SET status = '{new_status}', | |
| locked_at = NULL, | |
| locked_by = NULL | |
| WHERE id = ? | |
| AND status = 'processing' | |
| AND lease = ?; | |
| """, (job.id, job.lease)) | |
| conn.execute("COMMIT") | |
| except Exception: | |
| conn.execute("ROLLBACK") | |
| raise | |
| # ------------------------------------------------------------------ | |
| # Reclaim | |
| # ------------------------------------------------------------------ | |
| def reclaim_stale(self, conn: sqlite3.Connection, *, timeout_seconds: int) -> int: | |
| """ | |
| Reclaim jobs stuck in processing state longer than timeout. | |
| Returns: | |
| Number of reclaimed jobs. | |
| """ | |
| conn.execute("BEGIN") | |
| try: | |
| cur = conn.execute(""" | |
| UPDATE queue | |
| SET status = 'pending', | |
| locked_at = NULL, | |
| locked_by = NULL | |
| WHERE status = 'processing' | |
| AND locked_at IS NOT NULL | |
| AND locked_at < datetime('now', ?); | |
| """, (f"-{timeout_seconds} seconds",)) | |
| conn.execute("COMMIT") | |
| return int(cur.rowcount) | |
| except Exception: | |
| conn.execute("ROLLBACK") | |
| raise | |
| # ---------------------------------------------------------------------- | |
| # Batch consumer (cron entry point) | |
| # ---------------------------------------------------------------------- | |
| def run_batch_consumer( | |
| db_path: str, | |
| process_fn: Callable[[str], None], | |
| *, | |
| max_jobs: int = 100, | |
| reclaim_timeout_seconds: int = 3600, | |
| ) -> None: | |
| """ | |
| Run a batch consumer. | |
| Intended to be executed periodically (e.g. via cron). | |
| """ | |
| queue = SqliteQueue(db_path) | |
| conn = queue.connect() | |
| queue.init_schema(conn) | |
| # Optional: reclaim stale jobs first | |
| queue.reclaim_stale(conn, timeout_seconds=reclaim_timeout_seconds) | |
| processed = 0 | |
| while processed < max_jobs: | |
| job = queue.claim_one(conn) | |
| if job is None: | |
| break | |
| try: | |
| process_fn(job.payload) | |
| ok = queue.ack(conn, job) | |
| # If ok is False, the job was reclaimed while processing | |
| except Exception: | |
| queue.fail(conn, job, retry=True) | |
| processed += 1 | |
| # ---------------------------------------------------------------------- | |
| # Example usage | |
| # ---------------------------------------------------------------------- | |
| def example_process(payload: str) -> None: | |
| """ | |
| Example job processing function. | |
| Replace this with real logic. | |
| """ | |
| print(f"Processing payload: {payload}") | |
| if __name__ == "__main__": | |
| DB_PATH = "/gpfs/path/to/queue.db" | |
| run_batch_consumer( | |
| DB_PATH, | |
| example_process, | |
| max_jobs=50, | |
| reclaim_timeout_seconds=3600, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment