Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from thistleknot/fibonacci_optimal_k.py
Created January 8, 2026 17:39
Show Gist options
  • Select an option

  • Save datavudeja/ed496e6e73d7c47062fdb59b7e9128e2 to your computer and use it in GitHub Desktop.

Select an option

Save datavudeja/ed496e6e73d7c47062fdb59b7e9128e2 to your computer and use it in GitHub Desktop.
fibonacci clustered embeddings
#from gptcache import cache
#from transformers import BertModel, BertTokenizer
from datasets import load_dataset, concatenate_datasets
from sentence_transformers import SentenceTransformer
from sklearn.cluster import HDBSCAN, DBSCAN
from sklearn.metrics import pairwise_distances_argmin_min, silhouette_score, pairwise_distances
from sklearn.model_selection import ParameterGrid
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
from transformers import AutoTokenizer#, BertModel, BertTokenizer, BertConfig, DistilBertModel, DistilBertTokenizer
import datasets
import json
import numpy as np
#import openai
import os
import pandas as pd
import requests
import sqlite3
import torch
import hnswlib
from sklearn.decomposition import PCA
from scipy.stats import skew
from statsmodels.robust.scale import mad
from scipy.stats import kurtosis, skew
from sklearn.cluster import DBSCAN
from sklearn.cluster import KMeans
from sklearn.model_selection import KFold
import numpy as np
batch_token_limit = 16384 # for example
# Variable to store the best parameters
best_params = None
best_silhouette = -1 # Initialize with -1 to find a score >= 0
#hdbscan
# Prepare a list of hyperparameters to try
param_grid = {
'min_cluster_size': [3, 5, 8, 13, 21, 34, 55], # example values; please adjust as needed
'min_samples': [5, 8, 13, 21], # example values; adjust based on your dataset characteristics
# add other parameters here if you want
}
# Usage remains the same
db_path = "sentence_embeddings.sqlite"
torch.cuda.is_available()
# Load dataset and model
dataset = load_dataset("Abirate/english_quotes")
quotes = [item['quote'] for item in dataset['train']]
# Set the device to GPU if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# Initialize the model
model = SentenceTransformer('all-MiniLM-L6-v2') # or choose another model suited for your needs
#from llmlingua import PromptCompressor
#llm_lingua = PromptCompressor()
#compressed_prompt = llm_lingua.compress_prompt(prompt, instruction="", question="", target_token=200)
model.to(device)
# Function to calculate statistics for each embedding
def calculate_embedding_metrics(embeddings):
# Calculate vertical mean (mean of each feature across all embeddings)
vertical_mean = np.mean(embeddings, axis=0)
# Storage for final metrics for each embedding
embedding_metrics = []
for embedding in embeddings:
# Calculate standard metrics
mean = np.mean(embedding)
std_dev = np.std(embedding)
skewness = skew(embedding)
median = np.median(embedding)
mad_val = mad(embedding) # Median Absolute Deviation
kurtosis_val = kurtosis(embedding)
# Compile metrics for this embedding
metrics = [mean, std_dev, skewness, median, mad_val, kurtosis_val]
embedding_metrics.append(metrics)
return np.array(embedding_metrics)
def reduce_dimensions_with_optimal_pca(embeddings, variance_ratio=0.99):
"""
Reduce the dimensionality of the embeddings using PCA, selecting the number of components
that retain a specified variance ratio (default is 99%).
"""
# Create a PCA instance and fit it to the embeddings
pca = PCA(n_components=variance_ratio) # Here, n_components represents the desired variance
reduced_embeddings = pca.fit_transform(embeddings)
# The actual number of components can be accessed through 'n_components_'
n_components = pca.n_components_
print(f"Reduced to {n_components} dimensions to retain {variance_ratio*100}% of variance.")
return pca, reduced_embeddings, n_components
def calculate_sentence_statistics(model, tokenizer, texts):
# Initialize the dictionary to store the embeddings
sentence_embeddings_dict = {}
# Parameters for sequence handling
sequence_length = 256
stride_length = 128
alpha_denominator = len(texts) # N + 1, where N is the number of sequences (128 in your case)
# Process each text
for text in tqdm(texts, position=0, leave=True):
# Tokenize the text
tokens = tokenizer.encode(text)
# Initialize the list to store the embeddings for the current text
current_text_embeddings = []
# Break the text into sequences and generate embeddings
for i in range(0, len(tokens), stride_length):
# Get the sequence of tokens
sequence_tokens = tokens[i:i + sequence_length]
# Check if the sequence is shorter than the desired sequence length
# If so, this is likely the last sequence in the text, and we need to pad it
if len(sequence_tokens) < sequence_length:
sequence_tokens = sequence_tokens + [tokenizer.pad_token_id] * (sequence_length - len(sequence_tokens))
# Convert the sequence of tokens back to text (if your model requires text input)
sequence_text = tokenizer.decode(sequence_tokens)
# Generate embeddings for the sequence
#not using batch_token_limit
sequence_embedding = model.encode(sequence_text, convert_to_tensor=False, show_progress_bar=False)
current_text_embeddings.append(np.array(sequence_embedding))
# Calculate the exponential moving average of the embeddings
if current_text_embeddings:
ema_embedding = current_text_embeddings[0]
alpha = 2.0 / alpha_denominator
for embedding in current_text_embeddings[1:]:
ema_embedding = alpha * embedding + (1 - alpha) * ema_embedding
# Store the final embedding for the text
sentence_embeddings_dict[text] = ema_embedding
return sentence_embeddings_dict
import json # Used for serialization
def save_embeddings_to_db(embeddings, db_path):
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create the table, with a single column for the entire embedding
cursor.execute('''
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
embedding TEXT
)
''')
# Insert the embeddings into the database
for embedding in embeddings:
# Serialize the embedding as a JSON string
embedding_str = json.dumps(embedding.tolist()) # Assuming 'embedding' is a numpy array
# Insert the serialized embedding into the database
cursor.execute('''
INSERT INTO embeddings (embedding) VALUES (?)
''', (embedding_str,))
# Commit the changes and close the connection
conn.commit()
conn.close()
def load_embeddings_from_db(db_path="sentence_embeddings.sqlite"):
# Connect to the database
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Check if the embeddings table exists
cursor.execute("PRAGMA table_info(embeddings)")
info = cursor.fetchall()
if not info:
raise ValueError("The embeddings table does not exist")
# Here, we're assuming that the table structure is known (i.e., there's an 'embedding' column)
# and we don't need to dynamically fetch column names as before.
cursor.execute("SELECT embedding FROM embeddings")
rows = cursor.fetchall()
# Deserialize the embeddings and convert them back to their original format (e.g., list of floats)
embeddings = [json.loads(row[0]) for row in rows] # row[0] because we have only one column
return embeddings
def create_hnsw_index(sentence_embeddings, space='cosine'):
dim = len(sentence_embeddings[0]) # Number of dimensions the embeddings have
num_elements = len(sentence_embeddings)
# Initializing an HNSW index for the given number of dimensions
p = hnswlib.Index(space=space, dim=dim)
p.init_index(max_elements=num_elements, ef_construction=100, M=16)
# Add embeddings to the index
p.add_items(sentence_embeddings)
# Post-process for querying efficiency
p.set_ef(50)
return p
def calculate_wss_bss(data, labels):
"""
Calculates the within-cluster sum of squares (WSS) and between-cluster sum of squares (BSS) for a given clustering result.
Parameters:
- data: numpy.ndarray
The data points to be clustered.
- labels: numpy.ndarray
The cluster labels assigned to each data point.
Returns:
- float, float:
The WSS and BSS values for the clustering result.
Raises:
- ValueError:
Raises an error if the number of data points and labels do not match.
"""
#data = X_test
#labels = cluster_labels
# Create a mapping from labels to centroids, considering that some clusters might be empty.
label_to_centroid = {}
unique_labels = np.unique(labels)
for label in unique_labels:
cluster_points = data[labels == label]
if len(cluster_points) > 0: # Only if the cluster has points
centroid = np.mean(cluster_points, axis=0)
label_to_centroid[label] = centroid # Map the label to the calculated centroid.
# Now, calculate WSS with the safeguard that every label corresponds to a centroid.
wss = 0
for i in range(len(data)):
point = data[i]
label = labels[i]
centroid = label_to_centroid[label] # This ensures we are accessing the existing mapping.
wss += np.linalg.norm(point - centroid) ** 2
# Calculate the overall centroid (mean) of all data points
overall_centroid = np.mean(data, axis=0)
# Initialize BSS
bss = 0
# We already have a mapping from labels to centroids from the previous step (WSS calculation)
for label, centroid in label_to_centroid.items():
# Get all points belonging to this cluster
cluster_points = data[labels == label]
# Number of points in this cluster
num_points = len(cluster_points)
if num_points > 0: # Safety check to handle the case of an empty cluster
# Calculate the squared distance from the cluster centroid to the overall centroid
squared_distance = np.linalg.norm(centroid - overall_centroid) ** 2
# Weight by the number of points in the cluster
weighted_squared_distance = num_points * squared_distance
# Add to the BSS
bss += weighted_squared_distance
tss = wss + bss
return tss, wss, bss
def get_representative_sentences(embeddings, predictions, original_sentences):
"""
Find the most representative sentence for each cluster.
Parameters:
embeddings (np.array): The embeddings of the sentences.
predictions (list): The predicted cluster for each sentence.
original_sentences (list): The original sentences corresponding to the embeddings.
Returns:
dict: A dictionary of clusters with their representative sentences.
"""
# Number of clusters
num_clusters = len(set(predictions))
# Calculate the center of each cluster
cluster_centers = []
for i in range(num_clusters):
# Extract all embeddings that belong to the current cluster
cluster_embeddings = embeddings[predictions == i]
# Calculate the mean/center
center = np.mean(cluster_embeddings, axis=0)
cluster_centers.append(center)
cluster_centers = np.array(cluster_centers)
# For each sentence, get its cluster's center, calculate the distance from the center, and keep track of the minimum
representative_sentences = {}
for i in range(num_clusters):
# Get all sentence embeddings in this cluster
cluster_embeddings = embeddings[predictions == i]
# Calculate distances from the cluster center
distances = pairwise_distances(cluster_embeddings, [cluster_centers[i]], metric='euclidean')
# Find the index of the minimum distance in this cluster
min_distance_idx = np.argmin(distances)
# Fetch the corresponding most representative sentence
representative_sentence = original_sentences[np.where(predictions == i)[0][min_distance_idx]]
# Add to dictionary
representative_sentences[i] = {
"sentence": representative_sentence,
"distance": distances[min_distance_idx][0], # get the actual distance value
"cluster_size": len(cluster_embeddings)
}
return representative_sentences
def generate_custom_fibonacci(start, sample_n):
# Initial Fibonacci sequence numbers
a, b = 0, 1
sequence = []
while True:
# Generate the next Fibonacci number
next_num = a + b
# Check if we've reached the sample_n limit
if next_num > sample_n:
break # If we've exceeded the limit, break before adding to the sequence
# If the next number is the one we want to start with, or if we're already adding to the sequence,
# add the next number
if next_num >= start:
sequence.append(next_num)
# Move forward in the sequence
a, b = b, next_num
return sequence
import matplotlib.pyplot as plt
sample_n = 500
k_splits = 10
k_values = generate_custom_fibonacci(3, sample_n/k_splits) # Assuming we're checking for k between 2 and 10
# Check if the database exists, else create it and populate it with embeddings
if not os.path.exists(db_path):
sentence_statistics = calculate_sentence_statistics(model, tokenizer, quotes)
embeddings = [sentence_statistics[k] for k in list(sentence_statistics.keys())]
# Assuming 'embeddings' is your numpy array and 'sample_n' is the number of samples you want
sample_indices = np.random.choice(len(embeddings), size=sample_n, replace=False) # Sampling without replacement
sampled_embeddings = np.array(embeddings)[sample_indices]
sample_indices = np.random.choice(len(embeddings), size=sample_n, replace=False) # Sampling without replacement
sampled_embeddings = np.array(embeddings)[sample_indices]
pca, pca_embeddings_sample, positions = reduce_dimensions_with_optimal_pca(sampled_embeddings,variance_ratio=.99)
# Step 3: Find the optimal number of clusters using cross-validation
average_ratios = []
kf = KFold(n_splits=k_splits) # 5-fold cross-validation
for k in k_values:
ratios = []
for train_index, test_index in kf.split(pca_embeddings_sample):
X_train, X_test = pca_embeddings_sample[train_index], pca_embeddings_sample[test_index]
kmeans = KMeans(n_clusters=k, random_state=0,init='k-means++',n_init=5).fit(X_train)
cluster_labels = kmeans.predict(X_test)
tss, wss, bss = calculate_wss_bss(X_test, cluster_labels)
ratios.append(bss/tss)
average_ratios.append(np.mean(ratios))
plt.plot(k_values,average_ratios)
plt.show()
optimal_k = k_values[average_ratios.index(max(average_ratios))]
#apply to whole dataset
full_pca_embeddings = pca.transform(embeddings) # Use the same PCA transformation
final_kmeans = KMeans(n_clusters=optimal_k, random_state=0).fit(full_pca_embeddings)
# Step 5: Predict on the full dataset
full_predictions = final_kmeans.predict(full_pca_embeddings)
save_embeddings_to_db(full_predictions, db_path)
reduced_metrics = load_embeddings_from_db(db_path)
else:
reduced_metrics = load_embeddings_from_db(db_path)
# Convert list of tuples to a numpy array before clustering
if(False):
sentence_statistics_np = np.array(reduced_metrics, dtype=np.float32)
representative_sentences = get_representative_sentences(full_pca_embeddings, full_predictions, quotes)
# Convert the dictionary to a list of tuples (cluster_id, cluster_info)
clusters_list = [(cluster_id, info) for cluster_id, info in representative_sentences.items()]
# Sort the list by cluster size in descending order
clusters_list.sort(key=lambda x: x[1]['cluster_size'], reverse=True)
# Print the sorted results
for cluster in clusters_list:
cluster_id = cluster[0]
info = cluster[1]
print(f"Cluster {cluster_id} (Size: {info['cluster_size']}): Most representative sentence: '{info['sentence']}' (Distance: {info['distance']})")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment