Created
August 7, 2025 20:25
-
-
Save pamelafox/3d512b424ccb12dcc14049ce1c6eec0e to your computer and use it in GitHub Desktop.
Hybrid search with RRF and re-ranker with Azure Postgres
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pathlib import Path | |
import dotenv | |
import psycopg2 | |
from azure.identity import DefaultAzureCredential | |
from dotenv import load_dotenv | |
from pgvector.psycopg2 import register_vector | |
dotenv.load_dotenv(override=True) | |
EMBEDDING_MODEL_DEPLOYMENT = os.environ["EMBEDDING_MODEL_DEPLOYMENT_NAME"] | |
# Note: Using Cohere-rerank-v3.5 as default reranker (configured via azure_ml settings) | |
POSTGRES_HOST = os.environ["POSTGRES_SERVER_FQDN"] | |
POSTGRES_USERNAME = os.environ["POSTGRES_SERVER_USERNAME"] | |
POSTGRES_DATABASE = "zava" | |
if POSTGRES_HOST.endswith(".database.azure.com"): | |
print("Authenticating to Azure Database for PostgreSQL using Azure Identity...") | |
azure_credential = DefaultAzureCredential() | |
token = azure_credential.get_token("https://ossrdbms-aad.database.windows.net/.default") | |
POSTGRES_PASSWORD = token.token | |
else: | |
POSTGRES_PASSWORD = os.environ["POSTGRES_SERVER_PASSWORD"] | |
extra_params = {} | |
if POSTGRES_SSL := os.environ.get("POSTGRES_SSL"): | |
extra_params["sslmode"] = POSTGRES_SSL | |
conn = psycopg2.connect( | |
database=POSTGRES_DATABASE, | |
user=POSTGRES_USERNAME, | |
password=POSTGRES_PASSWORD, | |
host=POSTGRES_HOST, | |
**extra_params, | |
) | |
conn.autocommit = True | |
cur = conn.cursor() | |
# Create pgvector extension | |
cur.execute("CREATE EXTENSION IF NOT EXISTS vector") | |
register_vector(conn) | |
# Enable iterative index scans to ensure we get the full LIMIT count | |
cur.execute("SET hnsw.iterative_scan = strict_order") | |
# Search query | |
search_query = "garden watering supplies" | |
print(f"RRF Search with Cohere Reranker for: '{search_query}'") | |
# Create embedding using PostgreSQL azure_openai extension | |
cur.execute("SELECT azure_openai.create_embeddings(%s, %s)", (EMBEDDING_MODEL_DEPLOYMENT, search_query)) | |
embedding_result = cur.fetchone() | |
embedding = embedding_result[0] | |
# Convert to OR-based search for broader matching (like TF-IDF/BM25) | |
tsquery = ' | '.join(search_query.split()) | |
print(f"✓ Using OR-based query for broader matching: '{tsquery}'") | |
# RRF (Reciprocal Rank Fusion) parameter - controls the weighting | |
k = 60 # Standard RRF parameter value | |
# Triple RRF SQL query combining vector search, keyword search, and azure_ai.rank | |
triple_rrf_sql = """ | |
WITH base_candidates AS ( | |
-- Get a broader set of candidates using proper ranking from both searches | |
( | |
SELECT | |
p.product_id, | |
p.sku, | |
p.product_name, | |
p.product_description | |
FROM retail.products p | |
JOIN retail.product_description_embeddings pde ON p.product_id = pde.product_id | |
WHERE pde.description_embedding IS NOT NULL | |
ORDER BY pde.description_embedding <=> %(embedding)s::vector | |
LIMIT 15 | |
) | |
UNION | |
( | |
SELECT | |
p.product_id, | |
p.sku, | |
p.product_name, | |
p.product_description | |
FROM retail.products p | |
WHERE to_tsvector('english', p.product_name || ' ' || p.product_description) | |
@@ to_tsquery('english', %(tsquery)s) | |
ORDER BY ts_rank_cd( | |
to_tsvector('english', p.product_name || ' ' || p.product_description), | |
to_tsquery('english', %(tsquery)s), | |
2 | |
) DESC | |
LIMIT 15 | |
) | |
), | |
vector_search AS ( | |
SELECT | |
bc.product_id, | |
bc.sku, | |
bc.product_name, | |
bc.product_description, | |
RANK() OVER (ORDER BY pde.description_embedding <=> %(embedding)s::vector) AS rank | |
FROM base_candidates bc | |
JOIN retail.product_description_embeddings pde ON bc.product_id = pde.product_id | |
WHERE pde.description_embedding IS NOT NULL | |
ORDER BY pde.description_embedding <=> %(embedding)s::vector | |
LIMIT 20 | |
), | |
keyword_search AS ( | |
SELECT | |
bc.product_id, | |
bc.sku, | |
bc.product_name, | |
bc.product_description, | |
RANK() OVER (ORDER BY ts_rank_cd( | |
to_tsvector('english', bc.product_name || ' ' || bc.product_description), | |
to_tsquery('english', %(tsquery)s), | |
2 -- Normalize by document length (TF-IDF/BM25-like) | |
) DESC) AS rank | |
FROM base_candidates bc | |
WHERE to_tsvector('english', bc.product_name || ' ' || bc.product_description) | |
@@ to_tsquery('english', %(tsquery)s) | |
ORDER BY ts_rank_cd( | |
to_tsvector('english', bc.product_name || ' ' || bc.product_description), | |
to_tsquery('english', %(tsquery)s), | |
2 | |
) DESC | |
LIMIT 20 | |
), | |
rrf_combined AS ( | |
-- Step 1: RRF combination of vector and keyword search | |
SELECT | |
COALESCE(vs.product_id, ks.product_id) AS product_id, | |
COALESCE(vs.sku, ks.sku) AS sku, | |
COALESCE(vs.product_name, ks.product_name) AS product_name, | |
COALESCE(vs.product_description, ks.product_description) AS product_description, | |
COALESCE(1.0 / (%(k)s + vs.rank), 0.0) + | |
COALESCE(1.0 / (%(k)s + ks.rank), 0.0) AS rrf_score, | |
vs.rank AS vector_rank, | |
ks.rank AS keyword_rank, | |
ROW_NUMBER() OVER (ORDER BY | |
COALESCE(1.0 / (%(k)s + vs.rank), 0.0) + | |
COALESCE(1.0 / (%(k)s + ks.rank), 0.0) DESC | |
) AS rrf_rank | |
FROM vector_search vs | |
FULL OUTER JOIN keyword_search ks ON vs.product_id = ks.product_id | |
ORDER BY rrf_score DESC | |
LIMIT 50 -- Get top 50 for reranking | |
), | |
reranked AS ( | |
-- Step 2: Get ranker ranking for all RRF results, then join back | |
WITH ranker_results AS ( | |
SELECT id, rank, score | |
FROM azure_ai.rank( | |
query => %(query)s, | |
document_contents => ARRAY( | |
SELECT rrf2.product_name || ': ' || rrf2.product_description | |
FROM rrf_combined rrf2 | |
ORDER BY rrf2.rrf_score DESC | |
), | |
document_ids => ARRAY( | |
SELECT rrf2.product_id::text | |
FROM rrf_combined rrf2 | |
ORDER BY rrf2.rrf_score DESC | |
) | |
-- Using default Cohere-rerank-v3.5 model (no model parameter needed) | |
) | |
) | |
SELECT | |
rrf.*, | |
rr.rank AS ranker_rank, | |
rr.score AS ranker_score | |
FROM rrf_combined rrf | |
JOIN ranker_results rr ON rr.id = rrf.product_id::text | |
) | |
SELECT | |
r.product_id, | |
r.sku, | |
r.product_name, | |
r.product_description, | |
r.rrf_score, | |
r.ranker_score, | |
r.vector_rank, | |
r.keyword_rank, | |
r.rrf_rank, | |
r.ranker_rank | |
FROM reranked r | |
ORDER BY r.ranker_rank ASC -- Let reranker determine the final order | |
LIMIT 5; | |
""" | |
cur.execute(triple_rrf_sql, { | |
'embedding': embedding, | |
'tsquery': tsquery, | |
'query': search_query, | |
'k': k | |
}) | |
results = cur.fetchall() | |
for i, result in enumerate(results, 1): | |
product_id, sku, name, description, rrf_score, ranker_score, vector_rank, keyword_rank, rrf_rank, ranker_rank = result | |
print(f"{i}. {sku} - {name}") | |
print(f" RRF Score: {rrf_score:.4f} | Ranker Score: {ranker_score:.4f}") | |
# Show ranking progression | |
rankings = [] | |
if vector_rank is not None: | |
rankings.append(f"Vector: #{vector_rank}") | |
if keyword_rank is not None: | |
rankings.append(f"Keyword: #{keyword_rank}") | |
rankings.append(f"RRF: #{rrf_rank}") | |
rankings.append(f"Ranker: #{ranker_rank}") | |
print(f" Ranking Flow: {' → '.join(rankings)}") | |
print(f" Description: {description}") | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment