Created
May 10, 2025 13:29
-
-
Save kashifulhaque/ebff4fed86b88fb5278fe6fdbc29cf2d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
missioncontrol=> \d+ runbook | |
Table "missioncontrol.runbook" | |
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description | |
----------------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+------------- | |
pg_id | text | | not null | | extended | | | | |
chunk_idx_no | integer | | not null | | plain | | | | |
runbook_url | text | | not null | | extended | | | | |
runbook_cont_tx | text | | not null | | extended | | | | |
vectorised_runbook_cont_tx | vector(1024) | | not null | | external | | | | |
creat_ts | timestamp with time zone | | not null | now() | plain | | | | |
lst_updt_ts | timestamp with time zone | | not null | now() | plain | | | | |
Indexes: | |
"runbook_pkey" PRIMARY KEY, btree (pg_id, chunk_idx_no) | |
"index_url" btree (runbook_url) | |
"runbook_vectorised_runbook_cont_tx_idx" hnsw (vectorised_runbook_cont_tx vector_cosine_ops) WITH (m='32', ef_construction='128') | |
Access method: heap |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE EXTENSION IF NOT EXISTS vector; | |
CREATE TABLE IF NOT EXISTS runbook ( | |
pg_id TEXT NOT NULL, | |
chunk_idx_no INT NOT NULL, | |
runbook_url TEXT NOT NULL, | |
runbook_cont_tx TEXT NOT NULL, | |
vectorised_runbook_cont_tx VECTOR(1024) NOT NULL, | |
creat_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
lst_updt_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
PRIMARY KEY(pg_id, chunk_idx_no) | |
); | |
CREATE INDEX IF NOT EXISTS index_url ON runbook(runbook_url); | |
CREATE INDEX ON runbook | |
USING hnsw (embedding vector_cosine_ops) | |
WITH (m = 32, ef_construction = 128); | |
SET hnsw.ef_search = 20; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pytz | |
import asyncpg | |
import datetime | |
import numpy as np | |
from typing import Any | |
from .constants import TABLE_NAME | |
from ..core.redaction import custom_redaction | |
class VectorStore: | |
def __init__(self, pool): | |
self.pool = pool | |
self.table = TABLE_NAME | |
def _norm(self, vector: list) -> list: | |
arr = np.array(vector) | |
norm = np.linalg.norm(arr) | |
normalized = arr / norm if norm != 0 else arr | |
return normalized.tolist() | |
async def _process_chunk(self, page_id, chunk): | |
text = chunk.get("chunk") | |
text = custom_redaction(text) | |
url = chunk.get("url") | |
chunk_index = chunk.get("index") | |
modified_at = chunk.get("lastUpdated") | |
modified_at_dt = datetime.datetime.fromisoformat(modified_at.replace("Z", "+00:00")) | |
tz = pytz.timezone("America/Phoenix") | |
modified_at_tz = modified_at_dt.astimezone(tz) | |
# Check if the modified_at value is the same | |
async with self.pool.acquire() as conn: | |
current_modified_at = await conn.fetchval( | |
f"SELECT lst_updt_ts FROM {self.table} WHERE pg_id = $1 and chunk_idx_no = $2", | |
page_id, chunk_index | |
) | |
if current_modified_at: | |
current_modified_at_tz = current_modified_at.astimezone(tz) | |
if current_modified_at_tz == modified_at_tz: | |
return f"Skipped Page ID: {page_id}, Chunk: {chunk_index} (No changes)" | |
from safechain.lcel import amodel | |
emb_model = await amodel("10") | |
try: | |
vector = emb_model.embed_query(text) | |
except Exception as e: | |
print(e) | |
return f"Failed to embed page ID: {page_id}, Chunk: {chunk_index}" | |
vector = self._norm(vector) | |
vector_str = f"[{','.join(map(str, vector))}]" | |
async with self.pool.acquire() as conn: | |
r = await conn.execute( | |
f""" | |
INSERT INTO {self.table} (pg_id, chunk_idx_no, runbook_url, runbook_cont_tx, vectorised_runbook_cont_tx, lst_updt_ts) | |
VALUES ($1, $2, $3, $4, $5, $6) | |
ON CONFLICT (pg_id, chunk_idx_no) DO UPDATE | |
SET runbook_url = EXCLUDED.runbook_url, | |
runbook_cont_tx = EXCLUDED.runbook_cont_tx, | |
vectorised_runbook_cont_tx = EXCLUDED.vectorised_runbook_cont_tx, | |
lst_updt_ts = EXCLUDED.lst_updt_ts | |
WHERE {self.table}.lst_updt_ts IS DISTINCT FROM EXCLUDED.lst_updt_ts | |
""", | |
page_id, chunk_index, url, text, vector_str, modified_at_dt | |
) | |
return f"Upserted Page ID: {page_id}, Chunk: {chunk_index}" | |
async def upsert(self, chunks, task_data, task_id, total_chunks): | |
status_messages = [] | |
i = 0 | |
for page_id, chunk_list in chunks.items(): | |
for chunk in chunk_list: | |
message = await self._process_chunk(page_id, chunk) | |
status_messages.append(message) | |
progress = str(round(((i + 1) / total_chunks) * 100, 2)) + "%" | |
task_data[task_id]["progress"] = progress | |
i += 1 | |
return status_messages | |
async def search(self, query: str, k: int = 4): | |
from safechain.lcel import amodel | |
emb_model = await amodel("10") | |
query = custom_redaction(query) | |
q_emb = emb_model.embed_query(query) | |
vector = self._norm(q_emb) | |
vector_str = f"[{','.join(map(str, vector))}]" | |
async with self.pool.acquire() as conn: | |
records = await conn.fetch( | |
f""" | |
SELECT | |
pg_id, | |
chunk_idx_no, | |
runbook_url, | |
runbook_cont_tx, | |
vectorised_runbook_cont_tx <=> $1 AS distance, | |
1 - (vectorised_runbook_cont_tx <=> $1) AS similarity | |
FROM {self.table} | |
ORDER BY distance | |
LIMIT $2 | |
""", | |
vector_str, k | |
) | |
values = [dict(record) for record in records] | |
result = json.dumps(values).replace("</", "<\\/") | |
return json.loads(result) | |
async def delete(self, page_id: str): | |
async with self.pool.acquire() as conn: | |
result = await conn.execute( | |
f""" | |
DELETE FROM {self.table} WHERE pg_id = $1 | |
""", | |
page_id | |
) | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment