-
-
Save ngetahun/1499e5adf8d49cc6798f37525eae9206 to your computer and use it in GitHub Desktop.
really bad queue simulating sqs backed by sqlite
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import fcntl | |
import json | |
import sqlite3 | |
import time | |
import uuid | |
from typing import Any | |
from typing import Generator | |
from typing import Optional | |
from typing import Tuple | |
@contextlib.contextmanager | |
def lock() -> Generator[None, None, None]: | |
with open('queue.lock', 'a+') as f: | |
fcntl.flock(f.fileno(), fcntl.LOCK_EX) | |
try: | |
yield | |
finally: | |
fcntl.flock(f.fileno(), fcntl.LOCK_UN) | |
def _XXX_init_schema(db: sqlite3.Connection) -> None: | |
db.execute( | |
'CREATE TABLE IF NOT EXISTS queue_items (' | |
' queue_name TEXT NOT NULL,' | |
' group_key TEXT NOT NULL,' | |
' data_json TEXT NOT NULL,' | |
' visibility_time_end INT NULL,' | |
' receipt_handle TEXT NULL' | |
');', | |
) | |
@contextlib.contextmanager | |
def _db() -> Generator[sqlite3.Connection, None, None]: | |
with lock(): | |
with contextlib.closing(sqlite3.connect('queue.sqlite')) as ctx: | |
with ctx as db: | |
_XXX_init_schema(db) | |
yield db | |
def enqueue(queue_name: str, group_key: str, data: Any) -> None: | |
with _db() as db: | |
db.execute( | |
'INSERT INTO queue_items VALUES (?, ?, ?, NULL, NULL)', | |
(queue_name, group_key, json.dumps(data)), | |
) | |
def enqueue_with_delay( | |
queue_name: str, | |
group_key: str, | |
data: Any, | |
*, | |
delay: int, | |
) -> None: | |
time_end = int(time.time() + delay) | |
with _db() as db: | |
db.execute( | |
'INSERT INTO queue_items VALUES (?, ?, ?, ?, NULL)', | |
(queue_name, group_key, json.dumps(data), time_end), | |
) | |
def long_poll(queue_name: str, timeout: int) -> Optional[Tuple[str, Any]]: | |
receipt_query = ( | |
'UPDATE queue_items ' | |
'SET receipt_handle = ?, visibility_time_end = ? ' | |
'WHERE ROWID = ?' | |
) | |
end = time.time() + timeout | |
while time.time() < end: | |
with _db() as db: | |
queued = db.execute( | |
'SELECT ROWID, group_key, data_json, visibility_time_end ' | |
'FROM queue_items ' | |
'WHERE queue_name = ? ' | |
'ORDER BY ROWID ASC', | |
(queue_name,), | |
) | |
seen_group_keys = set() | |
for rowid, group_key, data_json, time_end in queued: | |
if group_key in seen_group_keys: | |
continue | |
elif time_end is not None and time_end > time.time(): | |
seen_group_keys.add(group_key) | |
continue | |
new_handle = uuid.uuid4().hex | |
# TODO: update expiration time based on queue | |
new_end_time = time.time() + 120 | |
db.execute(receipt_query, (new_handle, new_end_time, rowid)) | |
return new_handle, json.loads(data_json) | |
time.sleep(.1) | |
return None | |
def mark_completed(queue_name: str, receipt_handle: str) -> None: | |
query = ( | |
'DELETE FROM queue_items ' | |
'WHERE queue_name = ? AND receipt_handle = ?' | |
) | |
with _db() as db: | |
db.execute(query, (queue_name, receipt_handle)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment