Skip to content

Instantly share code, notes, and snippets.

@sawa2d2
Last active February 24, 2026 01:11
Show Gist options
  • Select an option

  • Save sawa2d2/b3fa8960b7fab5b7eeff2b02b7a7a9ef to your computer and use it in GitHub Desktop.

Select an option

Save sawa2d2/b3fa8960b7fab5b7eeff2b02b7a7a9ef to your computer and use it in GitHub Desktop.
import sqlite3
import socket
from dataclasses import dataclass
from typing import Optional, Callable
# ----------------------------------------------------------------------
# Data model
# ----------------------------------------------------------------------
@dataclass(frozen=True)
class Job:
id: int
payload: str
lease: int
# ----------------------------------------------------------------------
# Queue implementation
# ----------------------------------------------------------------------
class SqliteQueue:
"""
Persistent queue implemented on SQLite.
Properties:
- WAL disabled (rollback journal)
- Safe under multi-writer / multi-reader
- Atomic claim
- Fencing via monotonically increasing lease
- Crash-tolerant with reclaim
"""
def __init__(
self,
db_path: str,
*,
node_id: Optional[str] = None,
busy_timeout_ms: int = 5000,
) -> None:
self.db_path = db_path
self.node_id = node_id or socket.gethostname()
self.busy_timeout_ms = busy_timeout_ms
# ------------------------------------------------------------------
# Connection / setup
# ------------------------------------------------------------------
def connect(self) -> sqlite3.Connection:
"""
Open a SQLite connection configured for GPFS safety.
"""
conn = sqlite3.connect(
self.db_path,
timeout=self.busy_timeout_ms / 1000.0,
isolation_level=None, # explicit BEGIN/COMMIT
)
conn.row_factory = sqlite3.Row
# Safety-first pragmas
conn.execute("PRAGMA journal_mode=DELETE;") # WAL disabled
conn.execute("PRAGMA synchronous=FULL;") # strong durability
conn.execute(f"PRAGMA busy_timeout={self.busy_timeout_ms};")
return conn
def init_schema(self, conn: sqlite3.Connection) -> None:
"""
Initialize database schema.
"""
conn.execute("""
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload TEXT NOT NULL,
status TEXT NOT NULL CHECK (
status IN ('pending', 'processing', 'done', 'failed')
),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
locked_at DATETIME,
locked_by TEXT,
lease INTEGER NOT NULL DEFAULT 0
);
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_queue_status_id ON queue(status, id);"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_queue_locked_at ON queue(status, locked_at);"
)
# ------------------------------------------------------------------
# Producer
# ------------------------------------------------------------------
def enqueue(self, conn: sqlite3.Connection, payload: str) -> int:
"""
Enqueue a new job.
"""
conn.execute("BEGIN")
try:
cur = conn.execute(
"INSERT INTO queue (payload, status) VALUES (?, 'pending')",
(payload,),
)
job_id = int(cur.lastrowid)
conn.execute("COMMIT")
return job_id
except Exception:
conn.execute("ROLLBACK")
raise
# ------------------------------------------------------------------
# Consumer operations
# ------------------------------------------------------------------
def claim_one(self, conn: sqlite3.Connection) -> Optional[Job]:
"""
Atomically claim one pending job.
Returns:
Job if claimed, None if no pending job exists.
"""
conn.execute("BEGIN IMMEDIATE")
try:
row = conn.execute("""
UPDATE queue
SET status = 'processing',
locked_at = CURRENT_TIMESTAMP,
locked_by = ?,
lease = lease + 1
WHERE id = (
SELECT id
FROM queue
WHERE status = 'pending'
ORDER BY id
LIMIT 1
)
RETURNING id, payload, lease;
""", (self.node_id,)).fetchone()
conn.execute("COMMIT")
if row is None:
return None
return Job(
id=int(row["id"]),
payload=str(row["payload"]),
lease=int(row["lease"]),
)
except Exception:
conn.execute("ROLLBACK")
raise
def ack(self, conn: sqlite3.Connection, job: Job) -> bool:
"""
Mark a job as successfully processed.
Lease is checked to fence stale owners.
"""
conn.execute("BEGIN")
try:
cur = conn.execute("""
UPDATE queue
SET status = 'done'
WHERE id = ?
AND status = 'processing'
AND lease = ?;
""", (job.id, job.lease))
conn.execute("COMMIT")
return cur.rowcount == 1
except Exception:
conn.execute("ROLLBACK")
raise
def fail(self, conn: sqlite3.Connection, job: Job, *, retry: bool = True) -> None:
"""
Mark a job as failed or return it to pending.
retry=True -> status becomes 'pending'
retry=False -> status becomes 'failed'
"""
new_status = "pending" if retry else "failed"
conn.execute("BEGIN")
try:
conn.execute(f"""
UPDATE queue
SET status = '{new_status}',
locked_at = NULL,
locked_by = NULL
WHERE id = ?
AND status = 'processing'
AND lease = ?;
""", (job.id, job.lease))
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
# ------------------------------------------------------------------
# Reclaim
# ------------------------------------------------------------------
def reclaim_stale(self, conn: sqlite3.Connection, *, timeout_seconds: int) -> int:
"""
Reclaim jobs stuck in processing state longer than timeout.
Returns:
Number of reclaimed jobs.
"""
conn.execute("BEGIN")
try:
cur = conn.execute("""
UPDATE queue
SET status = 'pending',
locked_at = NULL,
locked_by = NULL
WHERE status = 'processing'
AND locked_at IS NOT NULL
AND locked_at < datetime('now', ?);
""", (f"-{timeout_seconds} seconds",))
conn.execute("COMMIT")
return int(cur.rowcount)
except Exception:
conn.execute("ROLLBACK")
raise
# ----------------------------------------------------------------------
# Batch consumer (cron entry point)
# ----------------------------------------------------------------------
def run_batch_consumer(
db_path: str,
process_fn: Callable[[str], None],
*,
max_jobs: int = 100,
reclaim_timeout_seconds: int = 3600,
) -> None:
"""
Run a batch consumer.
Intended to be executed periodically (e.g. via cron).
"""
queue = SqliteQueue(db_path)
conn = queue.connect()
queue.init_schema(conn)
# Optional: reclaim stale jobs first
queue.reclaim_stale(conn, timeout_seconds=reclaim_timeout_seconds)
processed = 0
while processed < max_jobs:
job = queue.claim_one(conn)
if job is None:
break
try:
process_fn(job.payload)
ok = queue.ack(conn, job)
# If ok is False, the job was reclaimed while processing
except Exception:
queue.fail(conn, job, retry=True)
processed += 1
# ----------------------------------------------------------------------
# Example usage
# ----------------------------------------------------------------------
def example_process(payload: str) -> None:
"""
Example job processing function.
Replace this with real logic.
"""
print(f"Processing payload: {payload}")
if __name__ == "__main__":
DB_PATH = "/gpfs/path/to/queue.db"
run_batch_consumer(
DB_PATH,
example_process,
max_jobs=50,
reclaim_timeout_seconds=3600,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment