Skip to content

Instantly share code, notes, and snippets.

@grahama1970
Last active October 12, 2024 20:36
Show Gist options
  • Save grahama1970/9ad977af2436ffa5e3e123ac01f22557 to your computer and use it in GitHub Desktop.
Save grahama1970/9ad977af2436ffa5e3e123ac01f22557 to your computer and use it in GitHub Desktop.
similarity_ranker

Similarity Reranker

The Similarity Reranker is a Python-based module designed to analyze and rank the similarity between documents. It utilizes advanced techniques such as BERT embeddings, BM25 scoring, and Language Model (LLM) refinement to provide a comprehensive similarity analysis. The module is configurable and can handle large document sets efficiently, making it suitable for various use cases like document retrieval, comparison, and clustering.

Key Features

  • Multi-stage Similarity Analysis: Combines embeddings, BM25, and LLM scoring to provide refined similarity rankings.
  • Dimensionality Reduction: Uses random projection to reduce the dimensionality of BERT embeddings, improving computational efficiency.
  • LLM-Based Refinement: Ranks document similarity using LLMs, with configurable models and parameters.
  • Efficient Handling of Large Document Sets: Supports chunking and multiprocessing to handle large datasets.
  • ArangoDB Integration: Stores similarity results and allows retrieval and upserts for efficient data management.
  • Highly Configurable: Parameters for embeddings, LLM models, and other analysis techniques can be customized via configuration files.

Technologies Used

  • BERT Embeddings: To represent documents in high-dimensional vector space.
  • BM25: A keyword-based similarity algorithm commonly used in document retrieval.
  • OpenAI/LLM: For refining and ranking document similarity.
  • Random Projection: Dimensionality reduction technique to make embedding computation more scalable.
  • ArangoDB: A NoSQL database used for storing and retrieving similarity data.

Directory Structure

similarity_reranker/
├── config_model.py            # Configuration schemas for embeddings, LLM, and database settings
├── embeddings/
│   ├── __init__.py
│   ├── bert_embeddings.py     # Manages BERT embedding generation
│   └── projection.py          # Implements random projection for dimensionality reduction
├── llm/
│   ├── __init__.py
│   ├── llm_helpers.py         # Helper functions for LLM interactions (API calls, response handling)
│   └── llm_ranking.py         # LLM-based ranking logic for document similarity refinement
├── main.py                    # Entry point for running similarity analysis
├── requirements.txt           # Project dependencies
├── similarity/
│   ├── __init__.py
│   ├── bm25_similarity.py     # Implements BM25-based keyword similarity scoring
│   ├── similarity_combiner.py # Combines various similarity metrics into a final score
│   ├── similarity_ranking.py  # Converts similarity scores to discrete rankings
│   └── similarity_refinement.py # Refines top-ranked similarities using embeddings
├── similarity_ranker_all.py   # Comprehensive script that integrates all similarity ranking stages
└── utils/
    └── __init__.py            # Placeholder for utility functions

Installation

To set up the project, follow these steps:

  1. Clone the repository:

    git clone https://github.com/your-username/similarity-reranker.git
    cd similarity-reranker
  2. Create a virtual environment:

    python3 -m venv venv
    source venv/bin/activate   # For Linux/Mac
    venv\Scripts\activate      # For Windows
  3. Install dependencies:

    pip install -r requirements.txt
  4. Set up environment variables:

    • Create a .env file in the root directory and set your API keys for OpenAI and ArangoDB:
    OPENAI_API_KEY=your-openai-key
    ARANGODB_HOST=http://localhost:8529
    ARANGODB_USER=root
    ARANGODB_PASSWORD=openSesame

Configuration

The config_model.py defines the configuration schema for the project.

import numpy as np
import torch
from transformers import BertTokenizer, BertModel
from pathlib import Path
from loguru import logger
from verifaix.similarity_reranker.embeddings.projection import random_projection_similarity
# Step 1: Download or Load the BERT Model from HuggingFace
def get_bert_model(embedder, model_dir):
"""
Download or load a BERT model from HuggingFace.
Parameters:
- embedder: The name of the model to download.
- model_dir: Directory where the model will be stored or loaded from.
Returns:
- tokenizer, model: Loaded tokenizer and BERT model.
"""
model_dir_path = Path(model_dir)
try:
if not model_dir_path.exists():
logger.info(f"Downloading BERT model: {embedder} to {model_dir}")
model_dir_path.mkdir(parents=True, exist_ok=True)
tokenizer = BertTokenizer.from_pretrained(embedder, cache_dir=model_dir)
model = BertModel.from_pretrained(embedder, cache_dir=model_dir)
else:
logger.info(f"Loading BERT model from {model_dir}")
tokenizer = BertTokenizer.from_pretrained(model_dir)
model = BertModel.from_pretrained(model_dir)
return tokenizer, model
except Exception as e:
logger.error(f"Error loading or downloading BERT model: {e}")
raise e
# Step 2: Generate BERT embeddings in smaller chunks
def generate_bert_embeddings(docs, embedder, model_dir, batch_size=32, chunk_size=500, temp_file_prefix="bert_embeddings_chunk_"):
"""
Generate BERT embeddings in smaller chunks to reduce memory pressure.
Stores embeddings temporarily on disk if memory is a concern.
Parameters:
- docs: List of documents to embed.
- embedder: The BERT model name from HuggingFace.
- model_dir: Directory to store/load the model from.
- batch_size: Batch size for BERT embedding.
- chunk_size: Number of documents processed at once to avoid memory issues.
- temp_file_prefix: Prefix for temporary file storage of chunked embeddings.
Returns:
- A list of file paths where embeddings are stored.
"""
tokenizer, model = get_bert_model(embedder, model_dir)
temp_file_paths = []
# Process documents in chunks to avoid memory pressure
for chunk_start in range(0, len(docs), chunk_size):
chunk = docs[chunk_start:chunk_start + chunk_size]
chunk_embeddings = []
# Process each chunk in batches
for i in range(0, len(chunk), batch_size):
batch = chunk[i:i + batch_size]
try:
inputs = tokenizer(batch, return_tensors='pt', padding=True, truncation=True)
with torch.no_grad():
outputs = model(**inputs)
# Append the batch embeddings
chunk_embeddings.append(outputs.last_hidden_state[:, 0, :].numpy())
except Exception as e:
logger.error(f"Error generating embeddings for chunk {chunk_start}-{chunk_start + chunk_size}: {e}")
raise e
# Combine the embeddings for the current chunk and write to a temporary file
chunk_embeddings = np.vstack(chunk_embeddings)
temp_file_path = f"{temp_file_prefix}{chunk_start}.npy"
np.save(temp_file_path, chunk_embeddings)
temp_file_paths.append(temp_file_path)
logger.info(f"Chunk {chunk_start} to {chunk_start + chunk_size} stored at {temp_file_path}")
return temp_file_paths
# Step 3: Load embeddings incrementally from disk
def load_embeddings_from_disk(temp_file_paths):
"""
Load embeddings incrementally from disk to avoid memory issues.
Parameters:
- temp_file_paths: List of file paths where embeddings are stored.
Returns:
- A numpy array of concatenated embeddings loaded incrementally.
"""
embeddings_list = []
for file_path in temp_file_paths:
chunk_embeddings = np.load(file_path)
embeddings_list.append(chunk_embeddings)
logger.info(f"Loaded embeddings from {file_path}")
# Combine all embeddings incrementally
return np.vstack(embeddings_list)
def process_embeddings_and_projection(documents, config):
"""
Generate embeddings for the input documents and apply random projection to reduce dimensionality.
"""
# Step 1: Generate original BERT embeddings in chunks, storing them temporarily
logger.info("Generating BERT embeddings for the documents")
temp_file_paths = generate_bert_embeddings(documents, config.embedder, config.bert_model_dir)
# Step 2: Load embeddings incrementally from disk to avoid memory pressure
original_embeddings = load_embeddings_from_disk(temp_file_paths)
# Step 3: Apply random projection to reduce dimensionality
logger.info(f"Applying random projection with {config.random_projection_n_components} components")
compressed_embeddings = random_projection_similarity(original_embeddings, config.random_projection_n_components)
return original_embeddings, compressed_embeddings
import numpy as np
from rank_bm25 import BM25Okapi as BM25
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
from verifaix.utils.loguru_setup import setup_logger
setup_logger()
def bm25_similarity(docs, config):
"""
Calculate BM25 keyword-based similarity in parallel while respecting the max_threads limit.
Parameters:
- docs: List of documents.
- config: Configuration dictionary containing `max_threads`.
Returns:
- A similarity matrix where each element is the BM25 similarity score between two documents.
"""
logger.info("Calculating BM25 keyword-based similarity")
# Tokenize documents for BM25
tokenized_docs = [doc.split() for doc in docs]
# Create BM25 object
bm25 = BM25(tokenized_docs)
# Initialize an empty similarity matrix
similarity_matrix = np.zeros((len(docs), len(docs)))
# Calculate BM25 for each document using ThreadPoolExecutor, respecting the max_threads limit
max_threads = min(config.get("max_threads", 4), len(docs)) # Ensure max_threads is not more than the number of docs
def calculate_bm25_for_doc(i):
"""Calculate BM25 scores for a given document index."""
return bm25.get_scores(tokenized_docs[i])
try:
# Use ThreadPoolExecutor with max_workers set to max_threads
with ThreadPoolExecutor(max_workers=max_threads) as executor:
results = list(executor.map(calculate_bm25_for_doc, range(len(docs))))
# Populate the similarity matrix with the BM25 scores
for i, scores in enumerate(results):
similarity_matrix[i] = scores
except Exception as e:
logger.error(f"Error calculating BM25 similarity: {e}")
raise e
return similarity_matrix
import os
from pydantic import BaseModel, HttpUrl, ValidationError, Field, PositiveInt
from typing import List, Optional
from dotenv import load_dotenv
from loguru import logger
from verifaix.utils.loguru_setup import setup_logger
setup_logger()
# Load environment variables from .env file
load_dotenv('../../.env')
class ArangoConfig(BaseModel):
"""
Pydantic class for validating ArangoDB-related configurations.
Uses environment variables as fallbacks.
"""
host: HttpUrl = Field(os.getenv("ARANGO_HOST", "http://localhost:8529"), description="URL for ArangoDB server.")
db_name: str = Field(os.getenv("ARANGO_DB_NAME", "nuclear"), description="ArangoDB database name.")
username: str = Field(os.getenv("ARANGO_USERNAME", "default_username"), description="ArangoDB username.")
password: str = Field(os.getenv("ARANGO_PASSWORD", "default_password"), description="ArangoDB password.")
class LLMConfig(BaseModel):
"""
Pydantic class for validating LLM-related configurations.
"""
model: str = Field(os.getenv("LLM_MODEL", "openai/gpt-4o-mini"), description="LLM model to use.")
json_mode: bool = Field(bool(os.getenv("LLM_JSON_MODE", True)), description="Whether to use JSON mode for LLM responses.")
max_tokens: PositiveInt = Field(
default=4096,
ge=1000,
le=8192,
description="The maximum number of tokens for the LLM queries. Must be between 1000 and 8192."
)
class EmbeddingConfig(BaseModel):
"""
Pydantic class for BERT embedding-related configurations.
"""
embedder: str = Field(os.getenv("EMBEDDER", "bert-base-uncased"), description="The HuggingFace BERT model to use for embeddings.")
bert_model_dir: str = Field(os.getenv("BERT_MODEL_DIR", "./models/bert"), description="Directory for saving/loading the HuggingFace model.")
max_threads: PositiveInt = Field(
default=int(os.getenv("MAX_THREADS", 4)),
ge=1,
description="Maximum number of threads to use for embedding processing."
)
class ConfigSchema(BaseModel):
"""
Pydantic class for the full configuration schema.
"""
arango_config: ArangoConfig
llm_config: LLMConfig
embedding_config: EmbeddingConfig
included_collections: List[str] = Field(
default=os.getenv("INCLUDED_COLLECTIONS", "").split(","),
description="List of collections to include in processing."
)
class Config:
extra = 'allow' # Allow extra fields in the configuration if present.
# Merge user-provided config with environment variable fallbacks
def load_and_validate_config(config_data: dict) -> ConfigSchema:
try:
validated_config = ConfigSchema(**config_data)
return validated_config
except ValidationError as e:
print(f"Validation error: {e}")
raise
# Validate inputs using Pydantic for documents and config validation
def validate_inputs(documents, config):
validated_config = load_and_validate_config(config)
if not documents or len(documents) == 0:
raise ValueError("The documents list is empty.")
if len(set([len(doc) for doc in documents])) != 1:
raise ValueError("All documents should have similar lengths for better comparison.")
logger.info("Inputs successfully validated.")
return validated_config
# Usage example
if __name__ == "__main__":
# User-provided config data, can be merged with environment variables
config = {
"arango_config": {
"host": "http://localhost:8529",
"db_name": "nuclear",
"username": "root",
"collection_name": "emergency_protocols",
"password": "openSesame",
},
"llm_config": {
"model": "openai/gpt-4o-mini",
"json_mode": True,
"max_tokens": 4200
},
"embedding_config": {
"embedder": "bert-base-uncased",
"bert_model_dir": "./models/bert",
"max_threads": 4
},
"included_collections": [
'emergency_protocols', 'employee_records', 'radiation_levels',
'reactor_data', 'waste_management'
],
"thing_to_not_validate": "this is a test" # Extra field to test the 'allow' option in Pydantic
}
# Load and validate the configuration
validated_config = load_and_validate_config(config)
print(validated_config.model_dump())
import asyncio
import pandas as pd
from loguru import logger
from verifaix.arangodb_helper.arango_client import connect_to_arango, retrieve_dataframe_from_arango, setup_arango_client
from verifaix.arangodb_helper.utils.bulk_insert import upsert_json_list_in_batches
from verifaix.similarity_reranker.config_model import load_and_validate_config, validate_inputs
from verifaix.similarity_reranker.similarity.bm25_similarity import bm25_similarity
from verifaix.similarity_reranker.similarity.similarity_combiner import combine_similarity_scores
from verifaix.similarity_reranker.similarity.similarity_ranking import rank_similarities
from verifaix.similarity_reranker.similarity.similarity_refinement import refine_similarity_with_original_embeddings
from verifaix.similarity_reranker.llm.llm_ranking import llm_first_ranking, llm_reranker
from verifaix.similarity_reranker.embeddings.bert_embeddings import process_embeddings_and_projection
from verifaix.similarity_reranker.embeddings.projection import random_projection_similarity
async def run_similarity_analysis(documents, config, similarity_df, output_format='dataframe'):
logger.info("Starting similarity analysis with refinement and LLM ranking")
# Step 1: Validate inputs
validate_inputs(documents, config)
# Step 2: Connect to ArangoDB and check existing results
client, arango_config = setup_arango_client(config)
db = connect_to_arango(client, arango_config)
collection = db[config.get('arango_collection')]
existing_data_df = retrieve_dataframe_from_arango(collection)
if existing_data_df is not None:
similarity_df = similarity_df.combine_first(existing_data_df)
# Step 3: Generate original embeddings and compressed embeddings
original_embeddings, compressed_embeddings = process_embeddings_and_projection(documents, config)
# Step 4: Compute initial similarities using compressed embeddings
random_sim = random_projection_similarity(compressed_embeddings, config.random_projection_n_components)
# Step 5: Compute BM25 keyword-based similarity
bm25_sim = bm25_similarity(documents)
# Step 6: Combine the similarities from compressed embeddings and BM25
combined_sim = combine_similarity_scores(random_sim, bm25_sim)
# Step 7: Rank the similarities from 1 to 5
ranked_sim = rank_similarities(combined_sim)
# Step 8: Refine similarities for rankings of 4+ using original embeddings
refined_similarities = refine_similarity_with_original_embeddings(ranked_sim, documents, original_embeddings, similarity_df)
# Step 9: Run the first LLM ranking for 4+ similarities (1 to 5 scale)
high_similarity_pairs = await llm_first_ranking(refined_similarities, documents, config, similarity_df)
# Step 10: Re-rank the 4+ similarities on a 1-100 scale with the second LLM call
top_document_pairs = await llm_reranker(high_similarity_pairs, config, similarity_df, top_n=10)
# Step 11: Upsert results to ArangoDB in batches
similarity_data = similarity_df.to_dict(orient='index')
upsert_json_list_in_batches(db, config['arango_collection'], similarity_data, batch_size=500)
# Step 12: Return the results based on output_format
if output_format == 'dataframe':
return pd.DataFrame(top_document_pairs).sort_values(by='llm_score', ascending=False)
elif output_format == 'json':
return sorted(top_document_pairs, key=lambda x: x['llm_score'], reverse=True)
if __name__ == "__main__":
documents = [
"The failure analysis of mechanical components revealed fatigue cracks.",
"Thermal stress analysis shows overheating of electronic circuits under heavy load.",
"Finite element simulations predict stress concentration at critical points.",
"Material fatigue in aerospace structures requires real-time monitoring systems.",
"Heat transfer simulations indicate inefficiencies in the cooling system design."
]
similarity_df = pd.DataFrame(index=range(len(documents)), columns=range(len(documents)))
config = load_and_validate_config("path_to_config.json")
result = asyncio.run(run_similarity_analysis(documents, config, similarity_df))
logger.info(result)
import numpy as np
from sklearn.random_projection import GaussianRandomProjection, SparseRandomProjection
from sklearn.metrics.pairwise import cosine_similarity
from loguru import logger
# Apply Randomized Projection to Reduce Dimensionality
def random_projection_similarity(embeddings, n_components, projection_type='gaussian'):
"""
Apply randomized projection to reduce dimensionality and compute a similarity matrix.
Parameters:
- embeddings: A numpy array of document embeddings.
- n_components: The number of components to reduce the dimensionality to.
- projection_type: The type of random projection to apply ('gaussian' or 'sparse').
Returns:
- A similarity matrix calculated using cosine similarity on reduced embeddings.
"""
try:
# Select the type of random projection
if projection_type == 'sparse':
projector = SparseRandomProjection(n_components=n_components)
else:
projector = GaussianRandomProjection(n_components=n_components, random_state=42)
# Apply the projection to reduce dimensionality
reduced_embeddings = projector.fit_transform(embeddings)
# Compute the cosine similarity matrix on the reduced embeddings
similarity_matrix = cosine_similarity(reduced_embeddings)
logger.info(f"Random projection (type: {projection_type}) and similarity calculation complete.")
return similarity_matrix
except ValueError as e:
logger.error(f"Invalid value for random projection: {e}")
raise e
except Exception as e:
logger.error(f"Error applying Random Projection: {e}")
raise e
import numpy as np
from loguru import logger
from verifaix.utils.loguru_setup import setup_logger
setup_logger()
def combine_similarity_scores(random_sim, bm25_sim, weight_random=0.5, weight_bm25=0.5):
"""
Combines BM25 and Random Projection similarity scores with custom weighting.
Parameters:
- random_sim: Similarity matrix from random projection.
- bm25_sim: Similarity matrix from BM25.
- weight_random: Weight for random projection similarity (default: 0.5).
- weight_bm25: Weight for BM25 similarity (default: 0.5).
Returns:
- A combined similarity matrix.
"""
logger.info("Combining BM25 and Random Projection similarity scores")
# Normalize both similarity matrices
random_sim_normalized = (random_sim - np.min(random_sim)) / (np.max(random_sim) - np.min(random_sim))
bm25_sim_normalized = (bm25_sim - np.min(bm25_sim)) / (np.max(bm25_sim) - np.min(bm25_sim))
# Combine the similarities using the given weights
combined_sim = (weight_random * random_sim_normalized + weight_bm25 * bm25_sim_normalized)
return combined_sim
import numpy as np
from loguru import logger
from verifaix.utils.loguru_setup import setup_logger
setup_logger()
def rank_similarities(similarity_matrix):
"""
Ranks the similarity scores in the matrix from 1 to 5.
Parameters:
- similarity_matrix: A similarity matrix with continuous values.
Returns:
- A ranked similarity matrix with values from 1 to 5.
"""
logger.info("Ranking similarities from 1 to 5")
# Initialize an empty ranked similarity matrix
ranked_similarities = np.zeros_like(similarity_matrix)
# Calculate thresholds based on the min and max values of the matrix
min_sim = np.min(similarity_matrix)
max_sim = np.max(similarity_matrix)
thresholds = np.linspace(min_sim, max_sim, 6) # Divides into 5 ranges
# Assign ranks based on the thresholds
for i in range(similarity_matrix.shape[0]):
for j in range(similarity_matrix.shape[1]):
score = similarity_matrix[i, j]
if score <= thresholds[1]:
ranked_similarities[i, j] = 1
elif score <= thresholds[2]:
ranked_similarities[i, j] = 2
elif score <= thresholds[3]:
ranked_similarities[i, j] = 3
elif score <= thresholds[4]:
ranked_similarities[i, j] = 4
else:
ranked_similarities[i, j] = 5
return ranked_similarities
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from loguru import logger
from verifaix.utils.loguru_setup import setup_logger
setup_logger()
def refine_similarity_with_original_embeddings(df_similarity, documents, original_embeddings, similarity_df):
"""
Refine similarities for sentence pairs with rank 4+ using original embeddings.
Parameters:
- df_similarity: Initial DataFrame with similarity scores.
- documents: List of all documents.
- original_embeddings: The original document embeddings.
- similarity_df: DataFrame to store refined similarities.
Returns:
- A DataFrame with refined similarity scores for rank 4+ pairs.
"""
logger.info("Refining similarities for sentence pairs with rank 4+ using original embeddings")
refined_similarities = df_similarity.copy()
for i, row in df_similarity.iterrows():
for j in df_similarity.columns:
if df_similarity.loc[i, j] >= 4 and i != j:
if similarity_df.loc[i, j, 'initial_similarity'] is not None:
continue
sentence1_idx = i
sentence2_idx = j
original_embedding1 = original_embeddings[sentence1_idx]
original_embedding2 = original_embeddings[sentence2_idx]
refined_similarity = cosine_similarity([original_embedding1], [original_embedding2])[0][0]
# Only keep scores of 4 or 5, omit those that don't meet the threshold
if refined_similarity >= 0.8: # Example threshold to verify if it remains a 4 or 5
refined_similarities.loc[i, j] = refined_similarity
similarity_df.loc[i, j, 'initial_similarity'] = refined_similarity
else:
refined_similarities.loc[i, j] = None # Omit if the refined score doesn't qualify
return refined_similarities
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment