Last active
December 18, 2025 16:30
-
-
Save ranjian0/2ca5e936698764eba645f8a19594ce81 to your computer and use it in GitHub Desktop.
Postgres DocumentStore
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, Depends | |
| from pydantic import BaseModel, ConfigDict | |
| from documentstore import DocumentStore | |
| # 1. Define your Document Schema | |
| class Ticket(BaseModel): | |
| model_config = ConfigDict(extra='allow') | |
| subject: str | |
| priority: str = "low" | |
| status: str = "open" | |
| # Global placeholder for our store | |
| # In a real app, you might use a dictionary for multiple collections | |
| ticket_store: DocumentStore[Ticket] = None | |
| # 2. Real-time Callback Logic | |
| async def alert_on_high_priority(action, ticket: Ticket): | |
| print(f"🚨 REAL-TIME ALERT: {ticket.subject} (Priority: {ticket.priority})") | |
| # 3. Lifespan: The "Brain" of the integration | |
| @asynccontextmanager | |
| async def lifespan(app: FastAPI): | |
| global ticket_store | |
| DB_URL = "postgresql://postgres:password@localhost:5432/postgres" | |
| # Initialize the store | |
| ticket_store = DocumentStore("tickets", Ticket, DB_URL) | |
| async with ticket_store as store: | |
| # Register real-time callbacks here | |
| store.subscribe("priority:high", alert_on_high_priority) | |
| print("🚀 DocumentStore is ready and listening...") | |
| yield | |
| print("🛑 Shutting down DocumentStore...") | |
| app = FastAPI(lifespan=lifespan) | |
| # 4. Dependency Injection Helper | |
| async def get_tickets(): | |
| return ticket_store | |
| # 5. The Request Handler (The "Mongo" Way) | |
| @app.post("/tickets") | |
| async def create_ticket(data: dict, store: DocumentStore[Ticket] = Depends(get_tickets)): | |
| """ | |
| Accepts any JSON, validates it via Pydantic, | |
| and saves it to Postgres JSONB. | |
| """ | |
| ticket_id = await store.insert_one(data) | |
| return {"id": ticket_id, "status": "created"} | |
| @app.get("/tickets/urgent") | |
| async def get_urgent(store: DocumentStore[Ticket] = Depends(get_tickets)): | |
| return await store.find(query={"priority": "high"}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import asyncio | |
| from typing import Type, TypeVar, List, Optional, Generic, Dict, Any, Union | |
| from pydantic import BaseModel, Field, ConfigDict | |
| import psycopg | |
| from psycopg.rows import dict_row | |
| T = TypeVar("T", bound=BaseModel) | |
| class DocumentStore(Generic[T]): | |
| def __init__(self, table_name: str, model: Type[T], db_url: str): | |
| """ | |
| Postgres-backed Document Store. | |
| Requires: pip install psycopg[binary] pydantic | |
| """ | |
| self.table_name = table_name | |
| self.model = model | |
| self.db_url = db_url | |
| self._init_db() | |
| def _init_db(self): | |
| """Sets up the table, indices, full-text search, and real-time triggers.""" | |
| with psycopg.connect(self.db_url) as conn: | |
| with conn.cursor() as cur: | |
| # 1. Create Table & JSONB Index | |
| cur.execute(f""" | |
| CREATE TABLE IF NOT EXISTS {self.table_name} ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| data JSONB NOT NULL, | |
| tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', data)) STORED | |
| ); | |
| """) | |
| cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data ON {self.table_name} USING GIN (data);") | |
| cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_tsv ON {self.table_name} USING GIN (tsv);") | |
| # 2. Create the Trigger Function for Subscriptions | |
| cur.execute(f""" | |
| CREATE OR REPLACE FUNCTION notify_{self.table_name}_change() RETURNS trigger AS $$ | |
| BEGIN | |
| PERFORM pg_notify('{self.table_name}', json_build_object( | |
| 'action', TG_OP, | |
| 'id', NEW.id, | |
| 'data', NEW.data | |
| )::text); | |
| RETURN NEW; | |
| END; | |
| $$ LANGUAGE plpgsql; | |
| """) | |
| # 3. Attach Trigger | |
| cur.execute(f"DROP TRIGGER IF EXISTS {self.table_name}_changed ON {self.table_name};") | |
| cur.execute(f""" | |
| CREATE TRIGGER {self.table_name}_changed | |
| AFTER INSERT OR UPDATE ON {self.table_name} | |
| FOR EACH ROW EXECUTE FUNCTION notify_{self.table_name}_change(); | |
| """) | |
| conn.commit() | |
| # --- WRITING DATA --- | |
| def insert_one(self, document: Union[dict, T]) -> str: | |
| """Validates and inserts a document. Returns the new ID.""" | |
| doc_obj = self.model(**document) if isinstance(document, dict) else document | |
| with psycopg.connect(self.db_url) as conn: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| f"INSERT INTO {self.table_name} (data) VALUES (%s) RETURNING id", | |
| [doc_obj.model_dump_json()] | |
| ) | |
| return str(cur.fetchone()[0]) | |
| def upsert(self, query: dict, update_data: dict) -> str: | |
| """Mongo-style upsert: find a doc by key/value and update it, or create it.""" | |
| with psycopg.connect(self.db_url) as conn: | |
| with conn.cursor() as cur: | |
| # Check if exists | |
| cur.execute(f"SELECT id, data FROM {self.table_name} WHERE data @> %s", [json.dumps(query)]) | |
| row = cur.fetchone() | |
| if row: | |
| # Update (Shallow Merge) | |
| new_data = {**row[1], **update_data} | |
| validated = self.model(**new_data) | |
| cur.execute(f"UPDATE {self.table_name} SET data = %s WHERE id = %s", [validated.model_dump_json(), row[0]]) | |
| return str(row[0]) | |
| else: | |
| # Insert | |
| full_doc = {**query, **update_data} | |
| return self.insert_one(full_doc) | |
| # --- READING DATA --- | |
| def find(self, | |
| query: Optional[dict] = None, | |
| limit: int = 10, | |
| offset: int = 0, | |
| sort_by: str = None, | |
| desc: bool = True) -> List[T]: | |
| """Find documents with support for filtering, pagination, and sorting.""" | |
| with psycopg.connect(self.db_url, row_factory=dict_row) as conn: | |
| with conn.cursor() as cur: | |
| sql = f"SELECT data FROM {self.table_name}" | |
| params = [] | |
| if query: | |
| sql += " WHERE data @> %s" | |
| params.append(json.dumps(query)) | |
| if sort_by: | |
| order = "DESC" if desc else "ASC" | |
| sql += f" ORDER BY data->>'{sort_by}' {order}" | |
| sql += " LIMIT %s OFFSET %s" | |
| params.extend([limit, offset]) | |
| cur.execute(sql, params) | |
| return [self.model(**row['data']) for row in cur.fetchall()] | |
| def search(self, text_query: str, limit: int = 10) -> List[T]: | |
| """Full-text search using Postgres tsvector ranking.""" | |
| with psycopg.connect(self.db_url, row_factory=dict_row) as conn: | |
| with conn.cursor() as cur: | |
| sql = f""" | |
| SELECT data, ts_rank(tsv, websearch_to_tsquery('english', %s)) as rank | |
| FROM {self.table_name} | |
| WHERE tsv @@ websearch_to_tsquery('english', %s) | |
| ORDER BY rank DESC LIMIT %s; | |
| """ | |
| cur.execute(sql, [text_query, text_query, limit]) | |
| return [self.model(**row['data']) for row in cur.fetchall()] | |
| # --- REAL-TIME --- | |
| async def subscribe(self): | |
| """Async generator yielding changes in real-time.""" | |
| async with await psycopg.AsyncConnection.connect(self.db_url, autocommit=True) as conn: | |
| await conn.execute(f"LISTEN {self.table_name}") | |
| async for notify in conn.notifies(): | |
| payload = json.loads(notify.payload) | |
| yield payload['action'], self.model(**payload['data']) | |
| # --- USAGE EXAMPLE --- | |
| class Task(BaseModel): | |
| model_config = ConfigDict(extra='allow') # Allow extra fields like Mongo | |
| title: str | |
| status: str = "pending" | |
| priority: int = 1 | |
| tags: List[str] = [] | |
| async def run_demo(): | |
| DB_URL = "postgresql://postgres:password@localhost:5432/postgres" | |
| store = DocumentStore("tasks", Task, DB_URL) | |
| # 1. Insert/Upsert | |
| store.upsert({"title": "Clean Room"}, {"priority": 5, "tags": ["home", "urgent"]}) | |
| store.insert_one({"title": "Buy Milk", "status": "pending", "tags": ["grocery"]}) | |
| # 2. Complex Find (Find urgent home tasks) | |
| tasks = store.find(query={"tags": ["home"]}, sort_by="priority") | |
| print(f"Found {len(tasks)} tasks.") | |
| # 3. Full Text Search | |
| search_results = store.search("clean") | |
| print(f"Search Result: {search_results[0].title if search_results else 'None'}") | |
| # 4. Start a Subscriber in the background | |
| print("Listening for changes (Ctrl+C to stop)...") | |
| async for action, doc in store.subscribe(): | |
| print(f"LIVE UPDATE: {action} - {doc.title}") | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(run_demo()) | |
| except KeyboardInterrupt: | |
| pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import asyncio | |
| from typing import Type, TypeVar, List, Optional, Generic, Callable, Dict, Any | |
| from pydantic import BaseModel, ConfigDict | |
| import psycopg | |
| from psycopg_pool import AsyncConnectionPool | |
| from psycopg.rows import dict_row | |
| T = TypeVar("T", bound=BaseModel) | |
| class DocumentStore(Generic[T]): | |
| def __init__(self, table_name: str, model: Type[T], db_url: str, min_size: int = 1, max_size: int = 10): | |
| self.table_name = table_name | |
| self.model = model | |
| self.db_url = db_url | |
| # 1. Connection Pooling (Shared across all methods) | |
| self.pool = AsyncConnectionPool(conninfo=db_url, min_size=min_size, max_size=max_size, open=False) | |
| # 2. Callback Registry: { topic_string: [callback_funcs] } | |
| self._callbacks: Dict[str, List[Callable[[str, T], Any]]] = {} | |
| self._listener_task: Optional[asyncio.Task] = None | |
| async def __aenter__(self): | |
| await self.pool.open() | |
| self._setup_db_sync() # Initial schema setup | |
| # Start background listener | |
| self._listener_task = asyncio.create_task(self._listen_forever()) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self._listener_task: | |
| self._listener_task.cancel() | |
| await self.pool.close() | |
| def _setup_db_sync(self): | |
| """Initial DB setup using a temporary connection.""" | |
| with psycopg.connect(self.db_url) as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(f"CREATE TABLE IF NOT EXISTS {self.table_name} (id UUID PRIMARY KEY DEFAULT gen_random_uuid(), data JSONB NOT NULL, tsv tsvector GENERATED ALWAYS AS (to_tsvector('english', data)) STORED);") | |
| cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data ON {self.table_name} USING GIN (data);") | |
| cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_tsv ON {self.table_name} USING GIN (tsv);") | |
| # Trigger for real-time notifications | |
| cur.execute(f""" | |
| CREATE OR REPLACE FUNCTION notify_{self.table_name}_change() RETURNS trigger AS $$ | |
| BEGIN | |
| PERFORM pg_notify('{self.table_name}', json_build_object('action', TG_OP, 'data', NEW.data)::text); | |
| RETURN NEW; | |
| END; | |
| $$ LANGUAGE plpgsql; | |
| """) | |
| cur.execute(f"DROP TRIGGER IF EXISTS {self.table_name}_changed ON {self.table_name};") | |
| cur.execute(f"CREATE TRIGGER {self.table_name}_changed AFTER INSERT OR UPDATE ON {self.table_name} FOR EACH ROW EXECUTE FUNCTION notify_{self.table_name}_change();") | |
| conn.commit() | |
| # --- MONGO-LIKE OPERATORS --- | |
| async def insert_one(self, document: dict) -> str: | |
| doc_json = self.model(**document).model_dump_json() | |
| async with self.pool.connection() as conn: | |
| async with conn.cursor() as cur: | |
| await cur.execute(f"INSERT INTO {self.table_name} (data) VALUES (%s) RETURNING id", [doc_json]) | |
| res = await cur.fetchone() | |
| return str(res[0]) | |
| async def update_one(self, query: dict, update: dict): | |
| """ | |
| Uses Postgres jsonb_set to update specific keys without overwriting the whole doc. | |
| Example update: {"status": "done"} | |
| """ | |
| async with self.pool.connection() as conn: | |
| async with conn.cursor() as cur: | |
| # This logic merges the new dict into the existing JSONB blob | |
| await cur.execute( | |
| f"UPDATE {self.table_name} SET data = data || %s WHERE data @> %s", | |
| [json.dumps(update), json.dumps(query)] | |
| ) | |
| async def find(self, query: dict = None) -> List[T]: | |
| async with self.pool.connection() as conn: | |
| conn.row_factory = dict_row | |
| async with conn.cursor() as cur: | |
| sql = f"SELECT data FROM {self.table_name}" | |
| params = [] | |
| if query: | |
| sql += " WHERE data @> %s" | |
| params.append(json.dumps(query)) | |
| await cur.execute(sql, params) | |
| rows = await cur.fetchall() | |
| return [self.model(**row['data']) for row in rows] | |
| # --- TOPIC SUBSCRIPTIONS --- | |
| def subscribe(self, topic: str, callback: Callable[[str, T], Any]): | |
| """ | |
| Register a callback for a topic. | |
| Topic format: "action:INSERT", "status:complete", or "all" | |
| """ | |
| if topic not in self._callbacks: | |
| self._callbacks[topic] = [] | |
| self._callbacks[topic].append(callback) | |
| print(f"✅ Subscribed to topic: {topic}") | |
| async def _listen_forever(self): | |
| """Background loop that routes notifications to callbacks.""" | |
| # Use a dedicated connection for listening | |
| async with await psycopg.AsyncConnection.connect(self.db_url, autocommit=True) as conn: | |
| await conn.execute(f"LISTEN {self.table_name}") | |
| async for notify in conn.notifies(): | |
| payload = json.loads(notify.payload) | |
| action = payload['action'] # INSERT, UPDATE | |
| data_dict = payload['data'] | |
| doc = self.model(**data_dict) | |
| # Route to appropriate callbacks | |
| await self._dispatch(action, doc, data_dict) | |
| async def _dispatch(self, action: str, doc: T, raw_data: dict): | |
| # 1. Action-based topics (e.g., "action:INSERT") | |
| topic_action = f"action:{action}" | |
| # 2. Key-value based topics (e.g., "status:pending") | |
| # Check every key in the document to see if anyone is subscribed to its value | |
| possible_topics = ["all", topic_action] | |
| for key, value in raw_data.items(): | |
| possible_topics.append(f"{key}:{value}") | |
| for topic in possible_topics: | |
| if topic in self._callbacks: | |
| for cb in self._callbacks[topic]: | |
| if asyncio.iscoroutinefunction(cb): | |
| await cb(action, doc) | |
| else: | |
| cb(action, doc) | |
| # --- EXAMPLE USAGE --- | |
| class Ticket(BaseModel): | |
| model_config = ConfigDict(extra='allow') | |
| subject: str | |
| priority: str | |
| status: str = "open" | |
| async def on_new_ticket(action, ticket: Ticket): | |
| print(f"📣 ALERT: New Ticket Created: {ticket.subject}") | |
| async def on_high_priority(action, ticket: Ticket): | |
| print(f"🔥 URGENT: High priority ticket detected: {ticket.subject}") | |
| async def main(): | |
| DB_URL = "postgresql://postgres:password@localhost:5432/postgres" | |
| async with DocumentStore("tickets", Ticket, DB_URL) as store: | |
| # Subscribe to specific topics | |
| store.subscribe("action:INSERT", on_new_ticket) | |
| store.subscribe("priority:high", on_high_priority) | |
| # Insert some data | |
| await store.insert_one({"subject": "Printer broken", "priority": "low"}) | |
| await store.insert_one({"subject": "DATABASE DOWN!", "priority": "high"}) | |
| # Give it a second to process notifications | |
| await asyncio.sleep(1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment