Skip to content

Instantly share code, notes, and snippets.

@loganpowell
Created August 22, 2023 13:44
Show Gist options
  • Save loganpowell/d7878fffefbcbdf603041e2287343cda to your computer and use it in GitHub Desktop.
Save loganpowell/d7878fffefbcbdf603041e2287343cda to your computer and use it in GitHub Desktop.
Python ML Chunking Algorithm
from sentence_transformers import SentenceTransformer, util
from torch import Tensor
from sklearn.metrics.pairwise import cosine_similarity
from scipy.signal import argrelextrema
import numpy as np
import math
import constants as C
import openai
import os
import json
from dotenv import load_dotenv
import openai
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
# TODO: this takes a long time... don't load more than once
#model = SentenceTransformer('all-mpnet-base-v2')
# 📦 LAMBDA 1: Chunking 📦
# OPTION 1: all-MiniLM-L6-v2
# dimension: 384
# size: 90 MB
# intended for sentence similarity
# By default, input text longer than 256 word pieces is truncated
# trained on sentences w/max token length of 128
#sim_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
# OPTION 2: gte-small
# dimension: 384
# size: 70 MB
# works for both sentence similarity and semantic search
#This model exclusively caters to English texts, and
# any lengthy texts will be truncated to a maximum of 512 tokens.
sim_model = SentenceTransformer("thenlper/gte-small")
def get_sentence_embeddings_from_transcript(transcript):
"""
Grabs the "text" from each sentence in the transcript and
generates embeddings for each, packing them into a list
These are ephemeral data -> only used to assign a cutting cursor
to the beginning of a new chunk of semantically similar sentences
(the index will be the only thing actually used)
"""
_list = transcript["sentences"]
print(f"generating sentence embeddings from transcript with {len(_list)} sentences")
embeddings = []
for payload in _list:
text = payload["text"]
embeddings.append(sim_model.encode(text))
return embeddings
def rev_sigmoid(x: float) -> float:
return (1 / (1 + math.exp(0.5*x)))
def activate_breakpoints(similarities: np.array, p_size=2, reluctance=2) -> np.array:
"""
Function returns list of weighted sums of activated sentence similarities
inputs:
- similarities (np array): square matrix sentence cosine similarity
- p_size (int): number of sentences are used to calculate weighted sum
- reluctance (int): higher relunctance ~ fewer splits
outputs:
- activations: list of weighted sums
- breakpoints: list of indices of breakpoints
"""
# To create weights for sigmoid function we first have to create space.
# P_size will determine number of sentences used and the size of weights vector.
x = np.linspace(-10, 10, p_size)
# Then we need to apply activation function to the created space
y = np.vectorize(rev_sigmoid)
# Because we only apply activation to p_size number of sentences we have
# to add zeros to neglect the effect of every additional sentence and to
# match the length ofvector we will multiply
activation_weights = np.pad(y(x), (0, similarities.shape[0] - p_size))
# 🤔 these might as well be Aramaic incantations...
# 1. Take each diagonal to the right of the main diagonal
diagonals = [similarities.diagonal(each)
for each in range(0, similarities.shape[0])]
# 2. Pad each diagonal by zeros at the end. Because each diagonal is
# different length we should pad it with zeros at the end
diagonals = [np.pad(each, (0, similarities.shape[0]-len(each)))
for each in diagonals]
# 3. Stack those diagonals into new matrix
diagonals = np.stack(diagonals)
# 4. Apply activation weights to each row. Multiply similarities with our activation.
diagonals = diagonals * activation_weights.reshape(-1, 1)
# 5. Calculate the weighted sum of activated similarities
activations = np.sum(diagonals, axis=0)
# 6. Find relative minima of our vector for all breakpoints and save
# them to variable with argrelextrema function
# reluctance parameter controls how frequent should be splits.
# Author doesn't reccomend changing this parameter (set by author to 2).
breakpoints = argrelextrema(activations, np.less, order=reluctance)
return breakpoints, activations
def chunk_splitter(transcript, p_size=2, reluctance=2):
"""
Pipeline that takes a transcript and returns chunks compartmentalized by
neighboring sentences with similar semantic meaning.
"""
embeddings = get_sentence_embeddings_from_transcript(transcript)
similarities = cosine_similarity(embeddings)
breakpoints, activations = activate_breakpoints(similarities, p_size, reluctance)
return breakpoints[0]
def chunk_checker(sentences, breakpoints):
"""
debugger to test the size and duration of to be chunks of text from
transcript after chunking process
"""
output = ''
cursor = 0
def stats(duration_s, start, end):
return f"\n\n👆 duration (seconds): {duration_s} | start: {start} | end {end}\n\n"
for idx, each in enumerate(sentences):
# keep track of the last timestamp and determine
# min and max lengths of time to form a clip from
text = each['text']
end = each['end']
start = each['start']
if idx == 0:
cursor = start
duration_ms = (end - cursor)
duration_s = duration_ms / 1000
if idx in breakpoints:
output += f'{text}{stats(duration_s, cursor/1000, end/1000)}\n\n'
cursor = end
else:
output += f'{text} '
if idx == len(sentences) - 1:
duration_ms = (end - cursor)
duration_s = duration_ms / 1000
output += f'{stats(duration_s, cursor/1000, end/1000)} '
return output
def openai_summary(
text,
model="gpt-3.5-turbo",
max_tokens=250,
best_of=True
):
"""
Each summary takes about 6 seconds to return from openai's API
Takes a chunk of text and sends it to openai for summarization.
Prompt and settings are set for little to low halucination (accuracy)
inputs:
- text: string of text to be summarized
- model: openai model to use for summarization
- max_tokens: max number of tokens (constrained by search embeddings model)
- best_of: whether or not to return the best of the choices from openai
outputs:
- string of summarized text
response sig:
{
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "\n\nHello there, how may I assist you today?",
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 9,
"completion_tokens": 12,
"total_tokens": 21
}
}
"""
response = openai.ChatCompletion.create(
model=model,
temperature=0,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0.5,
presence_penalty=0,
messages=[
{
"role": "system",
"content": "Pull out comprehensive summaries of topics covered within the provided content. Focus on explaining the concepts. Leave out third-person references (e.g. 'the speaker') from your summary. Be sure to include all proper nouns."
},
{
"role": "assistant",
"content": text
}
],
)
return response["choices"][0]["message"]["content"] if best_of else response["choices"]
# 📦 📦 📦 LAMBDA 2: Search Embeddings 📦 📦 📦
# multi-qa-MiniLM-L6-cos-v1
# dimension: 384
# size: 90 MB
# intented to be used for semantic search:
# It encodes queries / questions and text paragraphs in a dense vector space.
# It finds relevant documents for the given passages.
# Note that there is a limit of 512 word pieces
# trained on input text up to 250 word pieces
#qry_model = SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1")
qry_model = SentenceTransformer("thenlper/gte-small")
def summarize2embed(text, max_words):
"""
If text is longer than max_words, send to openai for summarization
Each summary from openai takes approximately 8 seconds to return
inputs:
- text: string of text to be summarized
- max_words: max number of words that triggers summary (openai)
"""
# count spaces to determine number of words
word_count = len(text.split(" "))
# if the word_count is greater than max wordpeices,
# send to openai for summarization
if word_count > max_words:
print("summarizing long text")
text = openai_summary(text)
embeddings = qry_model.encode(text).tolist()
return embeddings, text
# TODO: THIS IS A LONG RUNNING PROCESS (Dedicated Lambda?)
def chunk_packer(
sentences,
breakpoints,
hash,
precision = 2,
margin = 1,
embed = True,
max_words = 300,
directory = C.ASSETS_DIR,
):
"""
Long-running process (calls 2 OpenAI APIs per chunk) that packs text from
sentences into chunks of text with metadata (summary, embeddings, etc.)
inputs:
- sentences: list of sentences from transcript w/sig:
[{ text: "...", start: 0, end: 1000 }, ...]
- breakpoints: list of indices of breakpoints
- precision: decimal precision of seconds for timestamps
- margin: number of seconds to add around the clip
- embed: Whether or not to embed the text. Will also summarize for longer text
- max_words: max number of words to send to openai for summarization
- directory: directory to store chunks
outputs:
- list of chunks of text from transcript (stored and[or] returned [from store])
TODOs:
- store/read chunk files from S3
"""
# == todo S3 ==
# check if the chunks have already been generated
existing_chunks_path = os.path.join(directory, "chunks")
for file in os.listdir(existing_chunks_path):
if hash in file:
print(f"chunks exist: returning {file}")
with open(os.path.join(existing_chunks_path, file), "r") as f:
output = json.load(f)
return output
print(f"packing chunks for {hash}...")
output = []
# placeholders
buffer = ""
cursor = 0
current_sentences = []
for idx, each in enumerate(sentences):
start = each['start'] / 1000
end = each['end'] / 1000
text = each['text']
if idx == 0: cursor = start + margin # account for margin on start
duration = round((end - cursor), precision) + (2 * margin)
start_seconds = round(cursor, precision) - margin
stub = {
"id": hash,
"metadata": {
"start": start_seconds,
"end": round(end, precision) + margin,
"duration": duration,
}
}
if idx in breakpoints:
# last sentence in chunk
_text = buffer + f"{text}"
stub["metadata"]["sentences"] = current_sentences
stub["metadata"]["full_text"] = _text
if embed: embeddings, _text = summarize2embed(_text, max_words)
stub["metadata"]["text"] = _text
output.append({
**stub,
"values": embeddings if embed else []
})
# reset placeholders
current_sentences = []
buffer = ""
cursor = end
else:
# not the last sentence in chunk
buffer += f" {text}"
current_sentences.append({
"start": start,
"end": end,
"text": text
})
# if last sentence in transcript
if idx == len(sentences) - 1:
stub["metadata"]["sentences"] = current_sentences
stub["metadata"]["full_text"] = buffer
if embed: embeddings, buffer = summarize2embed(buffer, max_words)
stub["metadata"]["text"] = buffer
output.append({
**stub,
"values": embeddings if embed else []
})
# == todo S3 ==
# store as json
with open(os.path.join(existing_chunks_path, f"{hash}.json"), "w") as f:
print(f"writing chunks to {hash}.json")
json.dump(output, f)
return output
def convert_pc_json_to_py (pc_json):
"""
Pinecone vector inserts format converter
inputs:
- pc_json: list of dicts (can be read from json file) w/sig: [{"id": "id", "values": [...embds...], "metadata": {...meta...}}, ...]
outputs:
- list of python tuples w/sig: [("id", [...embds...], {"meta": "data"}), ...]
"""
return [(each["id"], each["values"], each["metadata"]) for each in pc_json]
def search_db(chunks):
"""
takes the embeddings from the chunk and stores them in a local "database"
of embeddings that can be queried against.
For local testing purposes of search performance for various embeddings models
"""
docs = []
for chunk in chunks:
text = chunk["metadata"]["text"]
docs.append(text)
#Encode query and documents
doc_emb = qry_model.encode(docs)
return doc_emb, docs
def search_qry(query, docs, doc_emb):
"""
tests the search performance of a query against a local "database" of embeddings
"""
query_emb = qry_model.encode(query)
#Compute dot score between query and all document embeddings
scores = util.dot_score(query_emb, doc_emb)[0].cpu().tolist()
#Combine docs & scores
doc_score_pairs = list(zip(docs, scores))
#Sort by decreasing score
doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
#Output passages & scores
output = []
for doc, score in doc_score_pairs:
output.append({
"score": score,
"text": doc
})
return output
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment