Skip to content

Instantly share code, notes, and snippets.

@kashifulhaque
Created May 10, 2025 13:29
Show Gist options
  • Save kashifulhaque/ebff4fed86b88fb5278fe6fdbc29cf2d to your computer and use it in GitHub Desktop.
Save kashifulhaque/ebff4fed86b88fb5278fe6fdbc29cf2d to your computer and use it in GitHub Desktop.
missioncontrol=> \d+ runbook
Table "missioncontrol.runbook"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
----------------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
pg_id | text | | not null | | extended | | |
chunk_idx_no | integer | | not null | | plain | | |
runbook_url | text | | not null | | extended | | |
runbook_cont_tx | text | | not null | | extended | | |
vectorised_runbook_cont_tx | vector(1024) | | not null | | external | | |
creat_ts | timestamp with time zone | | not null | now() | plain | | |
lst_updt_ts | timestamp with time zone | | not null | now() | plain | | |
Indexes:
"runbook_pkey" PRIMARY KEY, btree (pg_id, chunk_idx_no)
"index_url" btree (runbook_url)
"runbook_vectorised_runbook_cont_tx_idx" hnsw (vectorised_runbook_cont_tx vector_cosine_ops) WITH (m='32', ef_construction='128')
Access method: heap
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS runbook (
pg_id TEXT NOT NULL,
chunk_idx_no INT NOT NULL,
runbook_url TEXT NOT NULL,
runbook_cont_tx TEXT NOT NULL,
vectorised_runbook_cont_tx VECTOR(1024) NOT NULL,
creat_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
lst_updt_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(pg_id, chunk_idx_no)
);
CREATE INDEX IF NOT EXISTS index_url ON runbook(runbook_url);
CREATE INDEX ON runbook
USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);
SET hnsw.ef_search = 20;
import json
import pytz
import asyncpg
import datetime
import numpy as np
from typing import Any
from .constants import TABLE_NAME
from ..core.redaction import custom_redaction
class VectorStore:
def __init__(self, pool):
self.pool = pool
self.table = TABLE_NAME
def _norm(self, vector: list) -> list:
arr = np.array(vector)
norm = np.linalg.norm(arr)
normalized = arr / norm if norm != 0 else arr
return normalized.tolist()
async def _process_chunk(self, page_id, chunk):
text = chunk.get("chunk")
text = custom_redaction(text)
url = chunk.get("url")
chunk_index = chunk.get("index")
modified_at = chunk.get("lastUpdated")
modified_at_dt = datetime.datetime.fromisoformat(modified_at.replace("Z", "+00:00"))
tz = pytz.timezone("America/Phoenix")
modified_at_tz = modified_at_dt.astimezone(tz)
# Check if the modified_at value is the same
async with self.pool.acquire() as conn:
current_modified_at = await conn.fetchval(
f"SELECT lst_updt_ts FROM {self.table} WHERE pg_id = $1 and chunk_idx_no = $2",
page_id, chunk_index
)
if current_modified_at:
current_modified_at_tz = current_modified_at.astimezone(tz)
if current_modified_at_tz == modified_at_tz:
return f"Skipped Page ID: {page_id}, Chunk: {chunk_index} (No changes)"
from safechain.lcel import amodel
emb_model = await amodel("10")
try:
vector = emb_model.embed_query(text)
except Exception as e:
print(e)
return f"Failed to embed page ID: {page_id}, Chunk: {chunk_index}"
vector = self._norm(vector)
vector_str = f"[{','.join(map(str, vector))}]"
async with self.pool.acquire() as conn:
r = await conn.execute(
f"""
INSERT INTO {self.table} (pg_id, chunk_idx_no, runbook_url, runbook_cont_tx, vectorised_runbook_cont_tx, lst_updt_ts)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (pg_id, chunk_idx_no) DO UPDATE
SET runbook_url = EXCLUDED.runbook_url,
runbook_cont_tx = EXCLUDED.runbook_cont_tx,
vectorised_runbook_cont_tx = EXCLUDED.vectorised_runbook_cont_tx,
lst_updt_ts = EXCLUDED.lst_updt_ts
WHERE {self.table}.lst_updt_ts IS DISTINCT FROM EXCLUDED.lst_updt_ts
""",
page_id, chunk_index, url, text, vector_str, modified_at_dt
)
return f"Upserted Page ID: {page_id}, Chunk: {chunk_index}"
async def upsert(self, chunks, task_data, task_id, total_chunks):
status_messages = []
i = 0
for page_id, chunk_list in chunks.items():
for chunk in chunk_list:
message = await self._process_chunk(page_id, chunk)
status_messages.append(message)
progress = str(round(((i + 1) / total_chunks) * 100, 2)) + "%"
task_data[task_id]["progress"] = progress
i += 1
return status_messages
async def search(self, query: str, k: int = 4):
from safechain.lcel import amodel
emb_model = await amodel("10")
query = custom_redaction(query)
q_emb = emb_model.embed_query(query)
vector = self._norm(q_emb)
vector_str = f"[{','.join(map(str, vector))}]"
async with self.pool.acquire() as conn:
records = await conn.fetch(
f"""
SELECT
pg_id,
chunk_idx_no,
runbook_url,
runbook_cont_tx,
vectorised_runbook_cont_tx <=> $1 AS distance,
1 - (vectorised_runbook_cont_tx <=> $1) AS similarity
FROM {self.table}
ORDER BY distance
LIMIT $2
""",
vector_str, k
)
values = [dict(record) for record in records]
result = json.dumps(values).replace("</", "<\\/")
return json.loads(result)
async def delete(self, page_id: str):
async with self.pool.acquire() as conn:
result = await conn.execute(
f"""
DELETE FROM {self.table} WHERE pg_id = $1
""",
page_id
)
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment