Skip to content

Instantly share code, notes, and snippets.

@ranjian0
Last active December 18, 2025 16:30
Show Gist options
  • Select an option

  • Save ranjian0/2ca5e936698764eba645f8a19594ce81 to your computer and use it in GitHub Desktop.

Select an option

Save ranjian0/2ca5e936698764eba645f8a19594ce81 to your computer and use it in GitHub Desktop.
Postgres DocumentStore
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from pydantic import BaseModel, ConfigDict
from documentstore import DocumentStore
# 1. Define your Document Schema
class Ticket(BaseModel):
model_config = ConfigDict(extra='allow')
subject: str
priority: str = "low"
status: str = "open"
# Global placeholder for our store
# In a real app, you might use a dictionary for multiple collections
ticket_store: DocumentStore[Ticket] = None
# 2. Real-time Callback Logic
async def alert_on_high_priority(action, ticket: Ticket):
print(f"🚨 REAL-TIME ALERT: {ticket.subject} (Priority: {ticket.priority})")
# 3. Lifespan: The "Brain" of the integration
@asynccontextmanager
async def lifespan(app: FastAPI):
global ticket_store
DB_URL = "postgresql://postgres:password@localhost:5432/postgres"
# Initialize the store
ticket_store = DocumentStore("tickets", Ticket, DB_URL)
async with ticket_store as store:
# Register real-time callbacks here
store.subscribe("priority:high", alert_on_high_priority)
print("🚀 DocumentStore is ready and listening...")
yield
print("🛑 Shutting down DocumentStore...")
app = FastAPI(lifespan=lifespan)
# 4. Dependency Injection Helper
async def get_tickets():
return ticket_store
# 5. The Request Handler (The "Mongo" Way)
@app.post("/tickets")
async def create_ticket(data: dict, store: DocumentStore[Ticket] = Depends(get_tickets)):
"""
Accepts any JSON, validates it via Pydantic,
and saves it to Postgres JSONB.
"""
ticket_id = await store.insert_one(data)
return {"id": ticket_id, "status": "created"}
@app.get("/tickets/urgent")
async def get_urgent(store: DocumentStore[Ticket] = Depends(get_tickets)):
return await store.find(query={"priority": "high"})
import json
import asyncio
from typing import Type, TypeVar, List, Optional, Generic, Dict, Any, Union
from pydantic import BaseModel, Field, ConfigDict
import psycopg
from psycopg.rows import dict_row
T = TypeVar("T", bound=BaseModel)
class DocumentStore(Generic[T]):
def __init__(self, table_name: str, model: Type[T], db_url: str):
"""
Postgres-backed Document Store.
Requires: pip install psycopg[binary] pydantic
"""
self.table_name = table_name
self.model = model
self.db_url = db_url
self._init_db()
def _init_db(self):
"""Sets up the table, indices, full-text search, and real-time triggers."""
with psycopg.connect(self.db_url) as conn:
with conn.cursor() as cur:
# 1. Create Table & JSONB Index
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
data JSONB NOT NULL,
tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', data)) STORED
);
""")
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data ON {self.table_name} USING GIN (data);")
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_tsv ON {self.table_name} USING GIN (tsv);")
# 2. Create the Trigger Function for Subscriptions
cur.execute(f"""
CREATE OR REPLACE FUNCTION notify_{self.table_name}_change() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('{self.table_name}', json_build_object(
'action', TG_OP,
'id', NEW.id,
'data', NEW.data
)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
# 3. Attach Trigger
cur.execute(f"DROP TRIGGER IF EXISTS {self.table_name}_changed ON {self.table_name};")
cur.execute(f"""
CREATE TRIGGER {self.table_name}_changed
AFTER INSERT OR UPDATE ON {self.table_name}
FOR EACH ROW EXECUTE FUNCTION notify_{self.table_name}_change();
""")
conn.commit()
# --- WRITING DATA ---
def insert_one(self, document: Union[dict, T]) -> str:
"""Validates and inserts a document. Returns the new ID."""
doc_obj = self.model(**document) if isinstance(document, dict) else document
with psycopg.connect(self.db_url) as conn:
with conn.cursor() as cur:
cur.execute(
f"INSERT INTO {self.table_name} (data) VALUES (%s) RETURNING id",
[doc_obj.model_dump_json()]
)
return str(cur.fetchone()[0])
def upsert(self, query: dict, update_data: dict) -> str:
"""Mongo-style upsert: find a doc by key/value and update it, or create it."""
with psycopg.connect(self.db_url) as conn:
with conn.cursor() as cur:
# Check if exists
cur.execute(f"SELECT id, data FROM {self.table_name} WHERE data @> %s", [json.dumps(query)])
row = cur.fetchone()
if row:
# Update (Shallow Merge)
new_data = {**row[1], **update_data}
validated = self.model(**new_data)
cur.execute(f"UPDATE {self.table_name} SET data = %s WHERE id = %s", [validated.model_dump_json(), row[0]])
return str(row[0])
else:
# Insert
full_doc = {**query, **update_data}
return self.insert_one(full_doc)
# --- READING DATA ---
def find(self,
query: Optional[dict] = None,
limit: int = 10,
offset: int = 0,
sort_by: str = None,
desc: bool = True) -> List[T]:
"""Find documents with support for filtering, pagination, and sorting."""
with psycopg.connect(self.db_url, row_factory=dict_row) as conn:
with conn.cursor() as cur:
sql = f"SELECT data FROM {self.table_name}"
params = []
if query:
sql += " WHERE data @> %s"
params.append(json.dumps(query))
if sort_by:
order = "DESC" if desc else "ASC"
sql += f" ORDER BY data->>'{sort_by}' {order}"
sql += " LIMIT %s OFFSET %s"
params.extend([limit, offset])
cur.execute(sql, params)
return [self.model(**row['data']) for row in cur.fetchall()]
def search(self, text_query: str, limit: int = 10) -> List[T]:
"""Full-text search using Postgres tsvector ranking."""
with psycopg.connect(self.db_url, row_factory=dict_row) as conn:
with conn.cursor() as cur:
sql = f"""
SELECT data, ts_rank(tsv, websearch_to_tsquery('english', %s)) as rank
FROM {self.table_name}
WHERE tsv @@ websearch_to_tsquery('english', %s)
ORDER BY rank DESC LIMIT %s;
"""
cur.execute(sql, [text_query, text_query, limit])
return [self.model(**row['data']) for row in cur.fetchall()]
# --- REAL-TIME ---
async def subscribe(self):
"""Async generator yielding changes in real-time."""
async with await psycopg.AsyncConnection.connect(self.db_url, autocommit=True) as conn:
await conn.execute(f"LISTEN {self.table_name}")
async for notify in conn.notifies():
payload = json.loads(notify.payload)
yield payload['action'], self.model(**payload['data'])
# --- USAGE EXAMPLE ---
class Task(BaseModel):
model_config = ConfigDict(extra='allow') # Allow extra fields like Mongo
title: str
status: str = "pending"
priority: int = 1
tags: List[str] = []
async def run_demo():
DB_URL = "postgresql://postgres:password@localhost:5432/postgres"
store = DocumentStore("tasks", Task, DB_URL)
# 1. Insert/Upsert
store.upsert({"title": "Clean Room"}, {"priority": 5, "tags": ["home", "urgent"]})
store.insert_one({"title": "Buy Milk", "status": "pending", "tags": ["grocery"]})
# 2. Complex Find (Find urgent home tasks)
tasks = store.find(query={"tags": ["home"]}, sort_by="priority")
print(f"Found {len(tasks)} tasks.")
# 3. Full Text Search
search_results = store.search("clean")
print(f"Search Result: {search_results[0].title if search_results else 'None'}")
# 4. Start a Subscriber in the background
print("Listening for changes (Ctrl+C to stop)...")
async for action, doc in store.subscribe():
print(f"LIVE UPDATE: {action} - {doc.title}")
if __name__ == "__main__":
try:
asyncio.run(run_demo())
except KeyboardInterrupt:
pass
import json
import asyncio
from typing import Type, TypeVar, List, Optional, Generic, Callable, Dict, Any
from pydantic import BaseModel, ConfigDict
import psycopg
from psycopg_pool import AsyncConnectionPool
from psycopg.rows import dict_row
T = TypeVar("T", bound=BaseModel)
class DocumentStore(Generic[T]):
def __init__(self, table_name: str, model: Type[T], db_url: str, min_size: int = 1, max_size: int = 10):
self.table_name = table_name
self.model = model
self.db_url = db_url
# 1. Connection Pooling (Shared across all methods)
self.pool = AsyncConnectionPool(conninfo=db_url, min_size=min_size, max_size=max_size, open=False)
# 2. Callback Registry: { topic_string: [callback_funcs] }
self._callbacks: Dict[str, List[Callable[[str, T], Any]]] = {}
self._listener_task: Optional[asyncio.Task] = None
async def __aenter__(self):
await self.pool.open()
self._setup_db_sync() # Initial schema setup
# Start background listener
self._listener_task = asyncio.create_task(self._listen_forever())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._listener_task:
self._listener_task.cancel()
await self.pool.close()
def _setup_db_sync(self):
"""Initial DB setup using a temporary connection."""
with psycopg.connect(self.db_url) as conn:
with conn.cursor() as cur:
cur.execute(f"CREATE TABLE IF NOT EXISTS {self.table_name} (id UUID PRIMARY KEY DEFAULT gen_random_uuid(), data JSONB NOT NULL, tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', data)) STORED);")
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data ON {self.table_name} USING GIN (data);")
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_tsv ON {self.table_name} USING GIN (tsv);")
# Trigger for real-time notifications
cur.execute(f"""
CREATE OR REPLACE FUNCTION notify_{self.table_name}_change() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('{self.table_name}', json_build_object('action', TG_OP, 'data', NEW.data)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
cur.execute(f"DROP TRIGGER IF EXISTS {self.table_name}_changed ON {self.table_name};")
cur.execute(f"CREATE TRIGGER {self.table_name}_changed AFTER INSERT OR UPDATE ON {self.table_name} FOR EACH ROW EXECUTE FUNCTION notify_{self.table_name}_change();")
conn.commit()
# --- MONGO-LIKE OPERATORS ---
async def insert_one(self, document: dict) -> str:
doc_json = self.model(**document).model_dump_json()
async with self.pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(f"INSERT INTO {self.table_name} (data) VALUES (%s) RETURNING id", [doc_json])
res = await cur.fetchone()
return str(res[0])
async def update_one(self, query: dict, update: dict):
"""
Uses Postgres jsonb_set to update specific keys without overwriting the whole doc.
Example update: {"status": "done"}
"""
async with self.pool.connection() as conn:
async with conn.cursor() as cur:
# This logic merges the new dict into the existing JSONB blob
await cur.execute(
f"UPDATE {self.table_name} SET data = data || %s WHERE data @> %s",
[json.dumps(update), json.dumps(query)]
)
async def find(self, query: dict = None) -> List[T]:
async with self.pool.connection() as conn:
conn.row_factory = dict_row
async with conn.cursor() as cur:
sql = f"SELECT data FROM {self.table_name}"
params = []
if query:
sql += " WHERE data @> %s"
params.append(json.dumps(query))
await cur.execute(sql, params)
rows = await cur.fetchall()
return [self.model(**row['data']) for row in rows]
# --- TOPIC SUBSCRIPTIONS ---
def subscribe(self, topic: str, callback: Callable[[str, T], Any]):
"""
Register a callback for a topic.
Topic format: "action:INSERT", "status:complete", or "all"
"""
if topic not in self._callbacks:
self._callbacks[topic] = []
self._callbacks[topic].append(callback)
print(f"✅ Subscribed to topic: {topic}")
async def _listen_forever(self):
"""Background loop that routes notifications to callbacks."""
# Use a dedicated connection for listening
async with await psycopg.AsyncConnection.connect(self.db_url, autocommit=True) as conn:
await conn.execute(f"LISTEN {self.table_name}")
async for notify in conn.notifies():
payload = json.loads(notify.payload)
action = payload['action'] # INSERT, UPDATE
data_dict = payload['data']
doc = self.model(**data_dict)
# Route to appropriate callbacks
await self._dispatch(action, doc, data_dict)
async def _dispatch(self, action: str, doc: T, raw_data: dict):
# 1. Action-based topics (e.g., "action:INSERT")
topic_action = f"action:{action}"
# 2. Key-value based topics (e.g., "status:pending")
# Check every key in the document to see if anyone is subscribed to its value
possible_topics = ["all", topic_action]
for key, value in raw_data.items():
possible_topics.append(f"{key}:{value}")
for topic in possible_topics:
if topic in self._callbacks:
for cb in self._callbacks[topic]:
if asyncio.iscoroutinefunction(cb):
await cb(action, doc)
else:
cb(action, doc)
# --- EXAMPLE USAGE ---
class Ticket(BaseModel):
model_config = ConfigDict(extra='allow')
subject: str
priority: str
status: str = "open"
async def on_new_ticket(action, ticket: Ticket):
print(f"📣 ALERT: New Ticket Created: {ticket.subject}")
async def on_high_priority(action, ticket: Ticket):
print(f"🔥 URGENT: High priority ticket detected: {ticket.subject}")
async def main():
DB_URL = "postgresql://postgres:password@localhost:5432/postgres"
async with DocumentStore("tickets", Ticket, DB_URL) as store:
# Subscribe to specific topics
store.subscribe("action:INSERT", on_new_ticket)
store.subscribe("priority:high", on_high_priority)
# Insert some data
await store.insert_one({"subject": "Printer broken", "priority": "low"})
await store.insert_one({"subject": "DATABASE DOWN!", "priority": "high"})
# Give it a second to process notifications
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment