Created
August 22, 2023 13:44
-
-
Save loganpowell/d7878fffefbcbdf603041e2287343cda to your computer and use it in GitHub Desktop.
Python ML Chunking Algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sentence_transformers import SentenceTransformer, util | |
from torch import Tensor | |
from sklearn.metrics.pairwise import cosine_similarity | |
from scipy.signal import argrelextrema | |
import numpy as np | |
import math | |
import constants as C | |
import openai | |
import os | |
import json | |
from dotenv import load_dotenv | |
import openai | |
load_dotenv() | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
# TODO: this takes a long time... don't load more than once | |
#model = SentenceTransformer('all-mpnet-base-v2') | |
# 📦 LAMBDA 1: Chunking 📦 | |
# OPTION 1: all-MiniLM-L6-v2 | |
# dimension: 384 | |
# size: 90 MB | |
# intended for sentence similarity | |
# By default, input text longer than 256 word pieces is truncated | |
# trained on sentences w/max token length of 128 | |
#sim_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
# OPTION 2: gte-small | |
# dimension: 384 | |
# size: 70 MB | |
# works for both sentence similarity and semantic search | |
#This model exclusively caters to English texts, and | |
# any lengthy texts will be truncated to a maximum of 512 tokens. | |
sim_model = SentenceTransformer("thenlper/gte-small") | |
def get_sentence_embeddings_from_transcript(transcript): | |
""" | |
Grabs the "text" from each sentence in the transcript and | |
generates embeddings for each, packing them into a list | |
These are ephemeral data -> only used to assign a cutting cursor | |
to the beginning of a new chunk of semantically similar sentences | |
(the index will be the only thing actually used) | |
""" | |
_list = transcript["sentences"] | |
print(f"generating sentence embeddings from transcript with {len(_list)} sentences") | |
embeddings = [] | |
for payload in _list: | |
text = payload["text"] | |
embeddings.append(sim_model.encode(text)) | |
return embeddings | |
def rev_sigmoid(x: float) -> float: | |
return (1 / (1 + math.exp(0.5*x))) | |
def activate_breakpoints(similarities: np.array, p_size=2, reluctance=2) -> np.array: | |
""" | |
Function returns list of weighted sums of activated sentence similarities | |
inputs: | |
- similarities (np array): square matrix sentence cosine similarity | |
- p_size (int): number of sentences are used to calculate weighted sum | |
- reluctance (int): higher relunctance ~ fewer splits | |
outputs: | |
- activations: list of weighted sums | |
- breakpoints: list of indices of breakpoints | |
""" | |
# To create weights for sigmoid function we first have to create space. | |
# P_size will determine number of sentences used and the size of weights vector. | |
x = np.linspace(-10, 10, p_size) | |
# Then we need to apply activation function to the created space | |
y = np.vectorize(rev_sigmoid) | |
# Because we only apply activation to p_size number of sentences we have | |
# to add zeros to neglect the effect of every additional sentence and to | |
# match the length ofvector we will multiply | |
activation_weights = np.pad(y(x), (0, similarities.shape[0] - p_size)) | |
# 🤔 these might as well be Aramaic incantations... | |
# 1. Take each diagonal to the right of the main diagonal | |
diagonals = [similarities.diagonal(each) | |
for each in range(0, similarities.shape[0])] | |
# 2. Pad each diagonal by zeros at the end. Because each diagonal is | |
# different length we should pad it with zeros at the end | |
diagonals = [np.pad(each, (0, similarities.shape[0]-len(each))) | |
for each in diagonals] | |
# 3. Stack those diagonals into new matrix | |
diagonals = np.stack(diagonals) | |
# 4. Apply activation weights to each row. Multiply similarities with our activation. | |
diagonals = diagonals * activation_weights.reshape(-1, 1) | |
# 5. Calculate the weighted sum of activated similarities | |
activations = np.sum(diagonals, axis=0) | |
# 6. Find relative minima of our vector for all breakpoints and save | |
# them to variable with argrelextrema function | |
# reluctance parameter controls how frequent should be splits. | |
# Author doesn't reccomend changing this parameter (set by author to 2). | |
breakpoints = argrelextrema(activations, np.less, order=reluctance) | |
return breakpoints, activations | |
def chunk_splitter(transcript, p_size=2, reluctance=2): | |
""" | |
Pipeline that takes a transcript and returns chunks compartmentalized by | |
neighboring sentences with similar semantic meaning. | |
""" | |
embeddings = get_sentence_embeddings_from_transcript(transcript) | |
similarities = cosine_similarity(embeddings) | |
breakpoints, activations = activate_breakpoints(similarities, p_size, reluctance) | |
return breakpoints[0] | |
def chunk_checker(sentences, breakpoints): | |
""" | |
debugger to test the size and duration of to be chunks of text from | |
transcript after chunking process | |
""" | |
output = '' | |
cursor = 0 | |
def stats(duration_s, start, end): | |
return f"\n\n👆 duration (seconds): {duration_s} | start: {start} | end {end}\n\n" | |
for idx, each in enumerate(sentences): | |
# keep track of the last timestamp and determine | |
# min and max lengths of time to form a clip from | |
text = each['text'] | |
end = each['end'] | |
start = each['start'] | |
if idx == 0: | |
cursor = start | |
duration_ms = (end - cursor) | |
duration_s = duration_ms / 1000 | |
if idx in breakpoints: | |
output += f'{text}{stats(duration_s, cursor/1000, end/1000)}\n\n' | |
cursor = end | |
else: | |
output += f'{text} ' | |
if idx == len(sentences) - 1: | |
duration_ms = (end - cursor) | |
duration_s = duration_ms / 1000 | |
output += f'{stats(duration_s, cursor/1000, end/1000)} ' | |
return output | |
def openai_summary( | |
text, | |
model="gpt-3.5-turbo", | |
max_tokens=250, | |
best_of=True | |
): | |
""" | |
Each summary takes about 6 seconds to return from openai's API | |
Takes a chunk of text and sends it to openai for summarization. | |
Prompt and settings are set for little to low halucination (accuracy) | |
inputs: | |
- text: string of text to be summarized | |
- model: openai model to use for summarization | |
- max_tokens: max number of tokens (constrained by search embeddings model) | |
- best_of: whether or not to return the best of the choices from openai | |
outputs: | |
- string of summarized text | |
response sig: | |
{ | |
"id": "chatcmpl-123", | |
"object": "chat.completion", | |
"created": 1677652288, | |
"choices": [{ | |
"index": 0, | |
"message": { | |
"role": "assistant", | |
"content": "\n\nHello there, how may I assist you today?", | |
}, | |
"finish_reason": "stop" | |
}], | |
"usage": { | |
"prompt_tokens": 9, | |
"completion_tokens": 12, | |
"total_tokens": 21 | |
} | |
} | |
""" | |
response = openai.ChatCompletion.create( | |
model=model, | |
temperature=0, | |
max_tokens=max_tokens, | |
top_p=1, | |
frequency_penalty=0.5, | |
presence_penalty=0, | |
messages=[ | |
{ | |
"role": "system", | |
"content": "Pull out comprehensive summaries of topics covered within the provided content. Focus on explaining the concepts. Leave out third-person references (e.g. 'the speaker') from your summary. Be sure to include all proper nouns." | |
}, | |
{ | |
"role": "assistant", | |
"content": text | |
} | |
], | |
) | |
return response["choices"][0]["message"]["content"] if best_of else response["choices"] | |
# 📦 📦 📦 LAMBDA 2: Search Embeddings 📦 📦 📦 | |
# multi-qa-MiniLM-L6-cos-v1 | |
# dimension: 384 | |
# size: 90 MB | |
# intented to be used for semantic search: | |
# It encodes queries / questions and text paragraphs in a dense vector space. | |
# It finds relevant documents for the given passages. | |
# Note that there is a limit of 512 word pieces | |
# trained on input text up to 250 word pieces | |
#qry_model = SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1") | |
qry_model = SentenceTransformer("thenlper/gte-small") | |
def summarize2embed(text, max_words): | |
""" | |
If text is longer than max_words, send to openai for summarization | |
Each summary from openai takes approximately 8 seconds to return | |
inputs: | |
- text: string of text to be summarized | |
- max_words: max number of words that triggers summary (openai) | |
""" | |
# count spaces to determine number of words | |
word_count = len(text.split(" ")) | |
# if the word_count is greater than max wordpeices, | |
# send to openai for summarization | |
if word_count > max_words: | |
print("summarizing long text") | |
text = openai_summary(text) | |
embeddings = qry_model.encode(text).tolist() | |
return embeddings, text | |
# TODO: THIS IS A LONG RUNNING PROCESS (Dedicated Lambda?) | |
def chunk_packer( | |
sentences, | |
breakpoints, | |
hash, | |
precision = 2, | |
margin = 1, | |
embed = True, | |
max_words = 300, | |
directory = C.ASSETS_DIR, | |
): | |
""" | |
Long-running process (calls 2 OpenAI APIs per chunk) that packs text from | |
sentences into chunks of text with metadata (summary, embeddings, etc.) | |
inputs: | |
- sentences: list of sentences from transcript w/sig: | |
[{ text: "...", start: 0, end: 1000 }, ...] | |
- breakpoints: list of indices of breakpoints | |
- precision: decimal precision of seconds for timestamps | |
- margin: number of seconds to add around the clip | |
- embed: Whether or not to embed the text. Will also summarize for longer text | |
- max_words: max number of words to send to openai for summarization | |
- directory: directory to store chunks | |
outputs: | |
- list of chunks of text from transcript (stored and[or] returned [from store]) | |
TODOs: | |
- store/read chunk files from S3 | |
""" | |
# == todo S3 == | |
# check if the chunks have already been generated | |
existing_chunks_path = os.path.join(directory, "chunks") | |
for file in os.listdir(existing_chunks_path): | |
if hash in file: | |
print(f"chunks exist: returning {file}") | |
with open(os.path.join(existing_chunks_path, file), "r") as f: | |
output = json.load(f) | |
return output | |
print(f"packing chunks for {hash}...") | |
output = [] | |
# placeholders | |
buffer = "" | |
cursor = 0 | |
current_sentences = [] | |
for idx, each in enumerate(sentences): | |
start = each['start'] / 1000 | |
end = each['end'] / 1000 | |
text = each['text'] | |
if idx == 0: cursor = start + margin # account for margin on start | |
duration = round((end - cursor), precision) + (2 * margin) | |
start_seconds = round(cursor, precision) - margin | |
stub = { | |
"id": hash, | |
"metadata": { | |
"start": start_seconds, | |
"end": round(end, precision) + margin, | |
"duration": duration, | |
} | |
} | |
if idx in breakpoints: | |
# last sentence in chunk | |
_text = buffer + f"{text}" | |
stub["metadata"]["sentences"] = current_sentences | |
stub["metadata"]["full_text"] = _text | |
if embed: embeddings, _text = summarize2embed(_text, max_words) | |
stub["metadata"]["text"] = _text | |
output.append({ | |
**stub, | |
"values": embeddings if embed else [] | |
}) | |
# reset placeholders | |
current_sentences = [] | |
buffer = "" | |
cursor = end | |
else: | |
# not the last sentence in chunk | |
buffer += f" {text}" | |
current_sentences.append({ | |
"start": start, | |
"end": end, | |
"text": text | |
}) | |
# if last sentence in transcript | |
if idx == len(sentences) - 1: | |
stub["metadata"]["sentences"] = current_sentences | |
stub["metadata"]["full_text"] = buffer | |
if embed: embeddings, buffer = summarize2embed(buffer, max_words) | |
stub["metadata"]["text"] = buffer | |
output.append({ | |
**stub, | |
"values": embeddings if embed else [] | |
}) | |
# == todo S3 == | |
# store as json | |
with open(os.path.join(existing_chunks_path, f"{hash}.json"), "w") as f: | |
print(f"writing chunks to {hash}.json") | |
json.dump(output, f) | |
return output | |
def convert_pc_json_to_py (pc_json): | |
""" | |
Pinecone vector inserts format converter | |
inputs: | |
- pc_json: list of dicts (can be read from json file) w/sig: [{"id": "id", "values": [...embds...], "metadata": {...meta...}}, ...] | |
outputs: | |
- list of python tuples w/sig: [("id", [...embds...], {"meta": "data"}), ...] | |
""" | |
return [(each["id"], each["values"], each["metadata"]) for each in pc_json] | |
def search_db(chunks): | |
""" | |
takes the embeddings from the chunk and stores them in a local "database" | |
of embeddings that can be queried against. | |
For local testing purposes of search performance for various embeddings models | |
""" | |
docs = [] | |
for chunk in chunks: | |
text = chunk["metadata"]["text"] | |
docs.append(text) | |
#Encode query and documents | |
doc_emb = qry_model.encode(docs) | |
return doc_emb, docs | |
def search_qry(query, docs, doc_emb): | |
""" | |
tests the search performance of a query against a local "database" of embeddings | |
""" | |
query_emb = qry_model.encode(query) | |
#Compute dot score between query and all document embeddings | |
scores = util.dot_score(query_emb, doc_emb)[0].cpu().tolist() | |
#Combine docs & scores | |
doc_score_pairs = list(zip(docs, scores)) | |
#Sort by decreasing score | |
doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True) | |
#Output passages & scores | |
output = [] | |
for doc, score in doc_score_pairs: | |
output.append({ | |
"score": score, | |
"text": doc | |
}) | |
return output | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment