Skip to content

Instantly share code, notes, and snippets.

@sawa2d2
Last active March 9, 2026 03:24
Show Gist options
  • Select an option

  • Save sawa2d2/9b39359d976e1ea975f12bf5abeb25a2 to your computer and use it in GitHub Desktop.

Select an option

Save sawa2d2/9b39359d976e1ea975f12bf5abeb25a2 to your computer and use it in GitHub Desktop.
queue

Producer 側

def enqueue(payload: bytes, logical_hash: str, priority: int) -> int:
    """
    BEGIN IMMEDIATE;
    INSERT INTO requests (...);
    COMMIT;
    return request_id
    """

Consumer 側

Step 0. (TX内) Consume (lease をとる)

BEGIN IMMEDIATE;

UPDATE requests
SET lease_until = now + timeout
WHERE request_id = ?
  AND (lease_until IS NULL OR lease_until < now);

-- rowcount == 1 なら成功

COMMIT;

Step 1. (TX内) task_state に NULL INSERT

BEGIN IMMEDIATE;
INSERT request_id, task_id=NULL
COMMIT;

Step 2: 外部呼び出し(TX外)

call external app

Step 3. ACK (TX内)

BEGIN IMMEDIATE;
UPDATE task_state SET task_id = ? WHERE request_id = ?;
DELETE FROM requests WHERE request_id = ?;
COMMIT;

TIPS

CREATE TABLE kv ( k TEXT PRIMARY KEY, v TEXT );

ある場合は足す、ない場合は作る、というのを sqlite3 3.24+ でできる

INSERT INTO kv (k, v) VALUES (?, ?) ON CONFLICT(k) DO UPDATE SET v = excluded.v;

class RequestRepository:
def enqueue(self, conn, payload, logical_hash, priority):
cur = conn.execute("""
INSERT INTO requests (payload, logical_hash, priority)
VALUES (?, ?, ?)
""", (payload, logical_hash, priority))
return cur.lastrowid
def claim(self, conn, lease_seconds):
row = conn.execute("""
SELECT request_id
FROM requests
WHERE lease_until IS NULL
OR lease_until < CURRENT_TIMESTAMP
ORDER BY priority DESC,
(lease_until IS NOT NULL),
lease_until ASC,
request_id ASC
LIMIT 1
""").fetchone()
if not row:
return None
request_id = row["request_id"]
updated = conn.execute("""
UPDATE requests
SET lease_until = datetime(CURRENT_TIMESTAMP, ?)
WHERE request_id = ?
AND (lease_until IS NULL
OR lease_until < CURRENT_TIMESTAMP)
""", (f"+{lease_seconds} seconds", request_id)).rowcount
if updated != 1:
return None
return conn.execute("""
SELECT * FROM requests WHERE request_id = ?
""", (request_id,)).fetchone()
def delete(self, conn, request_id):
conn.execute("""
DELETE FROM requests WHERE request_id = ?
""", (request_id,))
class TaskRepository:
def insert_invoking(self, conn, request_id):
conn.execute("""
INSERT INTO task_state (request_id, task_id)
VALUES (?, NULL)
""", (request_id,))
def mark_submitted(self, conn, request_id, task_id):
conn.execute("""
UPDATE task_state
SET task_id = ?
WHERE request_id = ?
""", (task_id, request_id))
def process_one(db_path):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
conn.execute("BEGIN IMMEDIATE")
req = request_repo.claim(conn, lease_seconds=30)
if not req:
conn.commit()
return
task_repo.insert_invoking(conn, req["request_id"])
conn.commit() # Step 0 完了
finally:
conn.close()
# 外部呼び出し(TX外)
task_id = ext_app(req)
conn = sqlite3.connect(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
task_repo.mark_submitted(conn, req["request_id"], task_id)
request_repo.delete(conn, req["request_id"])
conn.commit()
finally:
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment