Created
June 21, 2024 18:51
-
-
Save cnmoro/fd7d297465d66ff9ba3978ee1fe158cf to your computer and use it in GitHub Desktop.
Semantic Chunking & Compressing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn.feature_extraction.text import CountVectorizer | |
from sklearn.decomposition import LatentDirichletAllocation | |
from minivectordb.embedding_model import EmbeddingModel | |
from sklearn.metrics.pairwise import cosine_similarity | |
import tiktoken, nltk, numpy as np, fasttext, base64 | |
from nltk.tokenize import sent_tokenize | |
from nltk.corpus import stopwords | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
embedding_model = EmbeddingModel() | |
gpt_encoding = tiktoken.encoding_for_model("gpt-4") | |
portuguese_stopwords = list(set(stopwords.words('portuguese'))) | |
english_stopwords = list(set(stopwords.words('english'))) | |
langdetect_model = fasttext.load_model('lid.176.ftz') | |
def detect_language(text): | |
detected_lang = langdetect_model.predict(text.replace('\n', ' '), k=1)[0][0] | |
return 'pt' if (str(detected_lang) == '__label__pt' or str(detected_lang) == 'portuguese') else 'en' | |
def advanced_semantic_chunk_text(full_text, tokens_per_chunk=250, num_topics=5): | |
def calculate_similarity(embed1, embed2): | |
return cosine_similarity([embed1], [embed2])[0][0] | |
def create_lda_model(texts, stopwords): | |
vectorizer = CountVectorizer(max_df=0.95, min_df=2, stop_words=stopwords) | |
doc_term_matrix = vectorizer.fit_transform(texts) | |
lda = LatentDirichletAllocation(n_components=num_topics, random_state=42) | |
lda.fit(doc_term_matrix) | |
return lda, vectorizer | |
def get_topic_distribution(text, lda, vectorizer): | |
vec = vectorizer.transform([text]) | |
return lda.transform(vec)[0] | |
# Split the text into sentences | |
sentences = sent_tokenize(full_text) | |
# Create initial chunks based on token count | |
chunks = [] | |
current_chunk = [] | |
current_chunk_length = 0 | |
for sentence in sentences: | |
sentence_tokens = gpt_encoding.encode(sentence) | |
if current_chunk_length + len(sentence_tokens) > tokens_per_chunk and current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
current_chunk = [] | |
current_chunk_length = 0 | |
current_chunk.append(sentence) | |
current_chunk_length += len(sentence_tokens) | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
# Create LDA model | |
text_lang = detect_language(full_text) | |
lda_model, vectorizer = create_lda_model(chunks, portuguese_stopwords if text_lang == 'pt' else english_stopwords) | |
# Optimize chunks | |
optimized_chunks = [] | |
current_chunk = chunks[0] | |
current_embedding = embedding_model.extract_embeddings(current_chunk) | |
current_topic_dist = get_topic_distribution(current_chunk, lda_model, vectorizer) | |
for next_chunk in chunks[1:]: | |
next_embedding = embedding_model.extract_embeddings(next_chunk) | |
next_topic_dist = get_topic_distribution(next_chunk, lda_model, vectorizer) | |
# Calculate semantic similarity | |
similarity = calculate_similarity(current_embedding, next_embedding) | |
# Calculate topic similarity | |
topic_similarity = cosine_similarity([current_topic_dist], [next_topic_dist])[0][0] | |
# Combine semantic and topic similarity | |
combined_similarity = (similarity + topic_similarity) / 2 | |
if combined_similarity > 0.6: | |
# Merge chunks | |
current_chunk += " " + next_chunk | |
current_embedding = embedding_model.extract_embeddings(current_chunk) | |
current_topic_dist = get_topic_distribution(current_chunk, lda_model, vectorizer) | |
else: | |
optimized_chunks.append(current_chunk) | |
current_chunk = next_chunk | |
current_embedding = next_embedding | |
current_topic_dist = next_topic_dist | |
optimized_chunks.append(current_chunk) | |
return optimized_chunks | |
def semantic_compress_text(full_text, compression_rate=0.7, num_topics=5): | |
def calculate_similarity(embed1, embed2): | |
return cosine_similarity([embed1], [embed2])[0][0] | |
def create_lda_model(texts, stopwords): | |
vectorizer = CountVectorizer(max_df=0.95, min_df=2, stop_words=stopwords) | |
doc_term_matrix = vectorizer.fit_transform(texts) | |
lda = LatentDirichletAllocation(n_components=num_topics, random_state=42) | |
lda.fit(doc_term_matrix) | |
return lda, vectorizer | |
def get_topic_distribution(text, lda, vectorizer): | |
vec = vectorizer.transform([text]) | |
return lda.transform(vec)[0] | |
def sentence_importance(sentence, doc_embedding, lda_model, vectorizer, stopwords): | |
sentence_embedding = embedding_model.extract_embeddings(sentence) | |
semantic_similarity = calculate_similarity(doc_embedding, sentence_embedding) | |
topic_dist = get_topic_distribution(sentence, lda_model, vectorizer) | |
topic_importance = np.max(topic_dist) | |
# Calculate lexical diversity | |
words = sentence.split() | |
unique_words = set([word.lower() for word in words if word.lower() not in stopwords]) | |
lexical_diversity = len(unique_words) / len(words) if words else 0 | |
# Combine factors (you can adjust weights as needed) | |
importance = (0.4 * semantic_similarity) + (0.4 * topic_importance) + (0.2 * lexical_diversity) | |
return importance | |
# Split the text into sentences | |
sentences = sent_tokenize(full_text) | |
text_lang = detect_language(full_text) | |
# Create LDA model | |
lda_model, vectorizer = create_lda_model(sentences, portuguese_stopwords if text_lang == 'pt' else english_stopwords) | |
# Get document-level embedding | |
doc_embedding = embedding_model.extract_embeddings(full_text) | |
# Calculate importance for each sentence | |
sentence_scores = [(sentence, sentence_importance(sentence, doc_embedding, lda_model, vectorizer, portuguese_stopwords if text_lang == 'pt' else english_stopwords)) | |
for sentence in sentences] | |
# Sort sentences by importance | |
sorted_sentences = sorted(sentence_scores, key=lambda x: x[1], reverse=True) | |
# Determine how many words to keep | |
total_words = sum(len(sentence.split()) for sentence in sentences) | |
target_words = int(total_words * compression_rate) | |
# Reconstruct the compressed text | |
compressed_text = [] | |
current_words = 0 | |
for sentence, _ in sorted_sentences: | |
sentence_words = len(sentence.split()) | |
if current_words + sentence_words <= target_words: | |
compressed_text.append(sentence) | |
current_words += sentence_words | |
else: | |
break | |
# Reorder sentences to maintain original flow | |
compressed_text.sort(key=lambda x: sentences.index(x)) | |
return ' '.join(compressed_text) | |
# Example usage | |
full_text = "Your long text here..." | |
chunks = advanced_semantic_chunk_text(full_text, tokens_per_chunk=100) | |
compression_rate = 0.3 # 30% compression | |
compression_rate = 1 - compression_rate | |
compressed_text = semantic_compress_text(full_text, compression_rate = compression_rate) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment