Last active
January 8, 2026 17:39
-
-
Save thistleknot/e4e931c8dd46c8b34869c3f6f034c1a1 to your computer and use it in GitHub Desktop.
fibonacci clustered embeddings
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #from gptcache import cache | |
| #from transformers import BertModel, BertTokenizer | |
| from datasets import load_dataset, concatenate_datasets | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.cluster import HDBSCAN, DBSCAN | |
| from sklearn.metrics import pairwise_distances_argmin_min, silhouette_score, pairwise_distances | |
| from sklearn.model_selection import ParameterGrid | |
| from sklearn.neighbors import NearestNeighbors | |
| from tqdm import tqdm | |
| from transformers import AutoTokenizer#, BertModel, BertTokenizer, BertConfig, DistilBertModel, DistilBertTokenizer | |
| import datasets | |
| import json | |
| import numpy as np | |
| #import openai | |
| import os | |
| import pandas as pd | |
| import requests | |
| import sqlite3 | |
| import torch | |
| import hnswlib | |
| from sklearn.decomposition import PCA | |
| from scipy.stats import skew | |
| from statsmodels.robust.scale import mad | |
| from scipy.stats import kurtosis, skew | |
| from sklearn.cluster import DBSCAN | |
| from sklearn.cluster import KMeans | |
| from sklearn.model_selection import KFold | |
| import numpy as np | |
| batch_token_limit = 16384 # for example | |
| # Variable to store the best parameters | |
| best_params = None | |
| best_silhouette = -1 # Initialize with -1 to find a score >= 0 | |
| #hdbscan | |
| # Prepare a list of hyperparameters to try | |
| param_grid = { | |
| 'min_cluster_size': [3, 5, 8, 13, 21, 34, 55], # example values; please adjust as needed | |
| 'min_samples': [5, 8, 13, 21], # example values; adjust based on your dataset characteristics | |
| # add other parameters here if you want | |
| } | |
| # Usage remains the same | |
| db_path = "sentence_embeddings.sqlite" | |
| torch.cuda.is_available() | |
| # Load dataset and model | |
| dataset = load_dataset("Abirate/english_quotes") | |
| quotes = [item['quote'] for item in dataset['train']] | |
| # Set the device to GPU if available, otherwise use CPU | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| # Initialize the model | |
| model = SentenceTransformer('all-MiniLM-L6-v2') # or choose another model suited for your needs | |
| #from llmlingua import PromptCompressor | |
| #llm_lingua = PromptCompressor() | |
| #compressed_prompt = llm_lingua.compress_prompt(prompt, instruction="", question="", target_token=200) | |
| model.to(device) | |
| # Function to calculate statistics for each embedding | |
| def calculate_embedding_metrics(embeddings): | |
| # Calculate vertical mean (mean of each feature across all embeddings) | |
| vertical_mean = np.mean(embeddings, axis=0) | |
| # Storage for final metrics for each embedding | |
| embedding_metrics = [] | |
| for embedding in embeddings: | |
| # Calculate standard metrics | |
| mean = np.mean(embedding) | |
| std_dev = np.std(embedding) | |
| skewness = skew(embedding) | |
| median = np.median(embedding) | |
| mad_val = mad(embedding) # Median Absolute Deviation | |
| kurtosis_val = kurtosis(embedding) | |
| # Compile metrics for this embedding | |
| metrics = [mean, std_dev, skewness, median, mad_val, kurtosis_val] | |
| embedding_metrics.append(metrics) | |
| return np.array(embedding_metrics) | |
| def reduce_dimensions_with_optimal_pca(embeddings, variance_ratio=0.99): | |
| """ | |
| Reduce the dimensionality of the embeddings using PCA, selecting the number of components | |
| that retain a specified variance ratio (default is 99%). | |
| """ | |
| # Create a PCA instance and fit it to the embeddings | |
| pca = PCA(n_components=variance_ratio) # Here, n_components represents the desired variance | |
| reduced_embeddings = pca.fit_transform(embeddings) | |
| # The actual number of components can be accessed through 'n_components_' | |
| n_components = pca.n_components_ | |
| print(f"Reduced to {n_components} dimensions to retain {variance_ratio*100}% of variance.") | |
| return pca, reduced_embeddings, n_components | |
| def calculate_sentence_statistics(model, tokenizer, texts): | |
| # Initialize the dictionary to store the embeddings | |
| sentence_embeddings_dict = {} | |
| # Parameters for sequence handling | |
| sequence_length = 256 | |
| stride_length = 128 | |
| alpha_denominator = len(texts) # N + 1, where N is the number of sequences (128 in your case) | |
| # Process each text | |
| for text in tqdm(texts, position=0, leave=True): | |
| # Tokenize the text | |
| tokens = tokenizer.encode(text) | |
| # Initialize the list to store the embeddings for the current text | |
| current_text_embeddings = [] | |
| # Break the text into sequences and generate embeddings | |
| for i in range(0, len(tokens), stride_length): | |
| # Get the sequence of tokens | |
| sequence_tokens = tokens[i:i + sequence_length] | |
| # Check if the sequence is shorter than the desired sequence length | |
| # If so, this is likely the last sequence in the text, and we need to pad it | |
| if len(sequence_tokens) < sequence_length: | |
| sequence_tokens = sequence_tokens + [tokenizer.pad_token_id] * (sequence_length - len(sequence_tokens)) | |
| # Convert the sequence of tokens back to text (if your model requires text input) | |
| sequence_text = tokenizer.decode(sequence_tokens) | |
| # Generate embeddings for the sequence | |
| #not using batch_token_limit | |
| sequence_embedding = model.encode(sequence_text, convert_to_tensor=False, show_progress_bar=False) | |
| current_text_embeddings.append(np.array(sequence_embedding)) | |
| # Calculate the exponential moving average of the embeddings | |
| if current_text_embeddings: | |
| ema_embedding = current_text_embeddings[0] | |
| alpha = 2.0 / alpha_denominator | |
| for embedding in current_text_embeddings[1:]: | |
| ema_embedding = alpha * embedding + (1 - alpha) * ema_embedding | |
| # Store the final embedding for the text | |
| sentence_embeddings_dict[text] = ema_embedding | |
| return sentence_embeddings_dict | |
| import json # Used for serialization | |
| def save_embeddings_to_db(embeddings, db_path): | |
| # Connect to the database | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Create the table, with a single column for the entire embedding | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS embeddings ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| embedding TEXT | |
| ) | |
| ''') | |
| # Insert the embeddings into the database | |
| for embedding in embeddings: | |
| # Serialize the embedding as a JSON string | |
| embedding_str = json.dumps(embedding.tolist()) # Assuming 'embedding' is a numpy array | |
| # Insert the serialized embedding into the database | |
| cursor.execute(''' | |
| INSERT INTO embeddings (embedding) VALUES (?) | |
| ''', (embedding_str,)) | |
| # Commit the changes and close the connection | |
| conn.commit() | |
| conn.close() | |
| def load_embeddings_from_db(db_path="sentence_embeddings.sqlite"): | |
| # Connect to the database | |
| with sqlite3.connect(db_path) as conn: | |
| cursor = conn.cursor() | |
| # Check if the embeddings table exists | |
| cursor.execute("PRAGMA table_info(embeddings)") | |
| info = cursor.fetchall() | |
| if not info: | |
| raise ValueError("The embeddings table does not exist") | |
| # Here, we're assuming that the table structure is known (i.e., there's an 'embedding' column) | |
| # and we don't need to dynamically fetch column names as before. | |
| cursor.execute("SELECT embedding FROM embeddings") | |
| rows = cursor.fetchall() | |
| # Deserialize the embeddings and convert them back to their original format (e.g., list of floats) | |
| embeddings = [json.loads(row[0]) for row in rows] # row[0] because we have only one column | |
| return embeddings | |
| def create_hnsw_index(sentence_embeddings, space='cosine'): | |
| dim = len(sentence_embeddings[0]) # Number of dimensions the embeddings have | |
| num_elements = len(sentence_embeddings) | |
| # Initializing an HNSW index for the given number of dimensions | |
| p = hnswlib.Index(space=space, dim=dim) | |
| p.init_index(max_elements=num_elements, ef_construction=100, M=16) | |
| # Add embeddings to the index | |
| p.add_items(sentence_embeddings) | |
| # Post-process for querying efficiency | |
| p.set_ef(50) | |
| return p | |
| def calculate_wss_bss(data, labels): | |
| """ | |
| Calculates the within-cluster sum of squares (WSS) and between-cluster sum of squares (BSS) for a given clustering result. | |
| Parameters: | |
| - data: numpy.ndarray | |
| The data points to be clustered. | |
| - labels: numpy.ndarray | |
| The cluster labels assigned to each data point. | |
| Returns: | |
| - float, float: | |
| The WSS and BSS values for the clustering result. | |
| Raises: | |
| - ValueError: | |
| Raises an error if the number of data points and labels do not match. | |
| """ | |
| #data = X_test | |
| #labels = cluster_labels | |
| # Create a mapping from labels to centroids, considering that some clusters might be empty. | |
| label_to_centroid = {} | |
| unique_labels = np.unique(labels) | |
| for label in unique_labels: | |
| cluster_points = data[labels == label] | |
| if len(cluster_points) > 0: # Only if the cluster has points | |
| centroid = np.mean(cluster_points, axis=0) | |
| label_to_centroid[label] = centroid # Map the label to the calculated centroid. | |
| # Now, calculate WSS with the safeguard that every label corresponds to a centroid. | |
| wss = 0 | |
| for i in range(len(data)): | |
| point = data[i] | |
| label = labels[i] | |
| centroid = label_to_centroid[label] # This ensures we are accessing the existing mapping. | |
| wss += np.linalg.norm(point - centroid) ** 2 | |
| # Calculate the overall centroid (mean) of all data points | |
| overall_centroid = np.mean(data, axis=0) | |
| # Initialize BSS | |
| bss = 0 | |
| # We already have a mapping from labels to centroids from the previous step (WSS calculation) | |
| for label, centroid in label_to_centroid.items(): | |
| # Get all points belonging to this cluster | |
| cluster_points = data[labels == label] | |
| # Number of points in this cluster | |
| num_points = len(cluster_points) | |
| if num_points > 0: # Safety check to handle the case of an empty cluster | |
| # Calculate the squared distance from the cluster centroid to the overall centroid | |
| squared_distance = np.linalg.norm(centroid - overall_centroid) ** 2 | |
| # Weight by the number of points in the cluster | |
| weighted_squared_distance = num_points * squared_distance | |
| # Add to the BSS | |
| bss += weighted_squared_distance | |
| tss = wss + bss | |
| return tss, wss, bss | |
| def get_representative_sentences(embeddings, predictions, original_sentences): | |
| """ | |
| Find the most representative sentence for each cluster. | |
| Parameters: | |
| embeddings (np.array): The embeddings of the sentences. | |
| predictions (list): The predicted cluster for each sentence. | |
| original_sentences (list): The original sentences corresponding to the embeddings. | |
| Returns: | |
| dict: A dictionary of clusters with their representative sentences. | |
| """ | |
| # Number of clusters | |
| num_clusters = len(set(predictions)) | |
| # Calculate the center of each cluster | |
| cluster_centers = [] | |
| for i in range(num_clusters): | |
| # Extract all embeddings that belong to the current cluster | |
| cluster_embeddings = embeddings[predictions == i] | |
| # Calculate the mean/center | |
| center = np.mean(cluster_embeddings, axis=0) | |
| cluster_centers.append(center) | |
| cluster_centers = np.array(cluster_centers) | |
| # For each sentence, get its cluster's center, calculate the distance from the center, and keep track of the minimum | |
| representative_sentences = {} | |
| for i in range(num_clusters): | |
| # Get all sentence embeddings in this cluster | |
| cluster_embeddings = embeddings[predictions == i] | |
| # Calculate distances from the cluster center | |
| distances = pairwise_distances(cluster_embeddings, [cluster_centers[i]], metric='euclidean') | |
| # Find the index of the minimum distance in this cluster | |
| min_distance_idx = np.argmin(distances) | |
| # Fetch the corresponding most representative sentence | |
| representative_sentence = original_sentences[np.where(predictions == i)[0][min_distance_idx]] | |
| # Add to dictionary | |
| representative_sentences[i] = { | |
| "sentence": representative_sentence, | |
| "distance": distances[min_distance_idx][0], # get the actual distance value | |
| "cluster_size": len(cluster_embeddings) | |
| } | |
| return representative_sentences | |
| def generate_custom_fibonacci(start, sample_n): | |
| # Initial Fibonacci sequence numbers | |
| a, b = 0, 1 | |
| sequence = [] | |
| while True: | |
| # Generate the next Fibonacci number | |
| next_num = a + b | |
| # Check if we've reached the sample_n limit | |
| if next_num > sample_n: | |
| break # If we've exceeded the limit, break before adding to the sequence | |
| # If the next number is the one we want to start with, or if we're already adding to the sequence, | |
| # add the next number | |
| if next_num >= start: | |
| sequence.append(next_num) | |
| # Move forward in the sequence | |
| a, b = b, next_num | |
| return sequence | |
| import matplotlib.pyplot as plt | |
| sample_n = 500 | |
| k_splits = 10 | |
| k_values = generate_custom_fibonacci(3, sample_n/k_splits) # Assuming we're checking for k between 2 and 10 | |
| # Check if the database exists, else create it and populate it with embeddings | |
| if not os.path.exists(db_path): | |
| sentence_statistics = calculate_sentence_statistics(model, tokenizer, quotes) | |
| embeddings = [sentence_statistics[k] for k in list(sentence_statistics.keys())] | |
| # Assuming 'embeddings' is your numpy array and 'sample_n' is the number of samples you want | |
| sample_indices = np.random.choice(len(embeddings), size=sample_n, replace=False) # Sampling without replacement | |
| sampled_embeddings = np.array(embeddings)[sample_indices] | |
| sample_indices = np.random.choice(len(embeddings), size=sample_n, replace=False) # Sampling without replacement | |
| sampled_embeddings = np.array(embeddings)[sample_indices] | |
| pca, pca_embeddings_sample, positions = reduce_dimensions_with_optimal_pca(sampled_embeddings,variance_ratio=.99) | |
| # Step 3: Find the optimal number of clusters using cross-validation | |
| average_ratios = [] | |
| kf = KFold(n_splits=k_splits) # 5-fold cross-validation | |
| for k in k_values: | |
| ratios = [] | |
| for train_index, test_index in kf.split(pca_embeddings_sample): | |
| X_train, X_test = pca_embeddings_sample[train_index], pca_embeddings_sample[test_index] | |
| kmeans = KMeans(n_clusters=k, random_state=0,init='k-means++',n_init=5).fit(X_train) | |
| cluster_labels = kmeans.predict(X_test) | |
| tss, wss, bss = calculate_wss_bss(X_test, cluster_labels) | |
| ratios.append(bss/tss) | |
| average_ratios.append(np.mean(ratios)) | |
| plt.plot(k_values,average_ratios) | |
| plt.show() | |
| optimal_k = k_values[average_ratios.index(max(average_ratios))] | |
| #apply to whole dataset | |
| full_pca_embeddings = pca.transform(embeddings) # Use the same PCA transformation | |
| final_kmeans = KMeans(n_clusters=optimal_k, random_state=0).fit(full_pca_embeddings) | |
| # Step 5: Predict on the full dataset | |
| full_predictions = final_kmeans.predict(full_pca_embeddings) | |
| save_embeddings_to_db(full_predictions, db_path) | |
| reduced_metrics = load_embeddings_from_db(db_path) | |
| else: | |
| reduced_metrics = load_embeddings_from_db(db_path) | |
| # Convert list of tuples to a numpy array before clustering | |
| if(False): | |
| sentence_statistics_np = np.array(reduced_metrics, dtype=np.float32) | |
| representative_sentences = get_representative_sentences(full_pca_embeddings, full_predictions, quotes) | |
| # Convert the dictionary to a list of tuples (cluster_id, cluster_info) | |
| clusters_list = [(cluster_id, info) for cluster_id, info in representative_sentences.items()] | |
| # Sort the list by cluster size in descending order | |
| clusters_list.sort(key=lambda x: x[1]['cluster_size'], reverse=True) | |
| # Print the sorted results | |
| for cluster in clusters_list: | |
| cluster_id = cluster[0] | |
| info = cluster[1] | |
| print(f"Cluster {cluster_id} (Size: {info['cluster_size']}): Most representative sentence: '{info['sentence']}' (Distance: {info['distance']})") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment