Created
October 30, 2023 16:48
-
-
Save Slyracoon23/f1da3f75a46ce61629a0aede0e2f1f23 to your computer and use it in GitHub Desktop.
Trigger.dev Discourse clustering job
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { client } from "@/trigger" | |
import { eventTrigger } from "@trigger.dev/sdk" | |
import { OpenAI } from "@trigger.dev/openai" | |
import { Supabase } from "@trigger.dev/supabase" | |
import { z } from "zod" | |
import _ from "lodash" | |
import { write } from "@/lib/neo4j" | |
// Validate the JSON structure using Zod | |
const conversationSchema = z.object({ | |
Informative_Cues: z.string(), | |
Extracted_Conversation: z.array( | |
z.object({ | |
message_id: z.number(), | |
content: z.string(), | |
timestamp: z.string(), | |
username: z.string(), | |
}) | |
), | |
Orphan_Flag: z.boolean(), | |
}) | |
const jsonResponseSchema = z.array(conversationSchema) | |
async function fetchDiscourseQuestionEmbeddings(io, orgId) { | |
const { data: discourseMessages, error: fetchError } = | |
await io.supabase.runTask( | |
"get-discourse-question-embeddings", | |
async (db) => { | |
// 1. Fetch the latest 100 messages from Supabase | |
return ( | |
db | |
.from("discourse_question_embeddings") | |
.select("*") | |
.eq("org_id", orgId) | |
// .range(100, 1000) | |
.limit(20) | |
) | |
} | |
) | |
if (fetchError) { | |
await io.logger.error("Failed to fetch discourse message:", fetchError) | |
throw fetchError | |
} | |
return discourseMessages | |
} | |
async function groupExtractMessagesByTopics(discourseMessages) { | |
// Group the messages by channel_id using lodash | |
const groupedByTopic = _.groupBy(discourseMessages, "topic_id") | |
const extractedData = _.mapValues(groupedByTopic, (messages) => { | |
// Sort the messages by timestamp in ascending order | |
const sortedMessages = _.sortBy(messages, "post_number", "desc") | |
return { | |
topic_title: sortedMessages[0].topic_title, // Assuming all messages in a group have the same channel_name | |
messages: sortedMessages.map((message) => ({ | |
discourse_post_id: message.discourse_post_id.toString(), | |
username: message.username, | |
content: message.cooked, | |
})), | |
} | |
}) | |
return extractedData | |
} | |
async function openaiIntentMessage(io, topicName, messagesForTopicString) { | |
const openaiPrompt = ` | |
Discourse Messages for Topic ${topicName}: ${messagesForTopicString} | |
Manual Topic Extraction with Iteration: | |
Familiarization & Data Preparation: | |
- Read Through: Read the entire dataset to understand the content. | |
- Organize: Ensure data is structured (e.g., chronologically or by author). | |
- Highlight Key Terms: Mark significant or frequently recurring terms. | |
Initial Topic Identification: | |
- List Themes: Make a list of recurring themes or subjects from the data. | |
- Group Terms: Cluster related terms under broader topics. | |
Iterative Refinement (3 Iterations): | |
- Iteration 1: | |
-- Review the initially identified topics and keywords. | |
-- Merge similar topics. | |
-- Split overly broad topics. | |
-- Revisit the text to check for missed keywords or topics. | |
- Iteration 2: | |
-- Further condense topics. | |
-- Eliminate redundant or overly specific keywords. | |
-- Align topics with the majority of associated messages or text passages. | |
- Iteration 3: | |
-- Finalize topic definitions. | |
-- Ensure each keyword is pertinent and not overly repetitive across topics. | |
-- Validate topics against the original text to ensure they capture main themes. | |
Validation: | |
- Peer Review: Have your topics reviewed by another person. | |
- Comparison: If possible, compare with topics from automated methods. | |
Documentation & Output Formation: | |
- Record Topics: Document your topics and related terms. | |
- Format Output: For each message or text passage, create an output in the specified format: | |
json | |
{ | |
"title": "Title of the Message or Text", | |
"extracted_keywords": ["keyword1", "keyword2", "keyword3", ...], | |
"topic": "Identified Main Topic", | |
"discourse_post_id": "Unique Identifier for the Message or Text" | |
} | |
The iterative process ensures that topics and keywords are revisited and refined multiple times, leading to a more condensed and accurate representation. Remember, even with iterations, manual extraction remains subjective, so it's essential to maintain transparency and rigor throughout the process. | |
` | |
// io.logger.info("openaiPrompt:", openaiPrompt); | |
// Summarize the message using OpenAI | |
// @ts-ignore | |
const openaiInput = { | |
model: "gpt-3.5-turbo", | |
messages: [ | |
{ | |
role: "user", | |
content: openaiPrompt, | |
}, | |
], | |
functions: [ | |
{ | |
name: "TopicExtractionMessages", | |
description: "Exctract Topic of discourse messages", | |
parameters: { | |
type: "object", | |
properties: { | |
topic_extractions: { | |
type: "array", | |
description: "An array of objects representing topic extractions", | |
items: { | |
type: "object", | |
properties: { | |
title: { | |
type: "string", | |
description: | |
"the title of the topic extraction of the message", | |
}, | |
extracted_keywords: { | |
type: "array", | |
items: { | |
type: "string", | |
}, | |
description: | |
"List of topic keyword repesenting the topic of the message", | |
}, | |
topic: { | |
type: "string", | |
description: | |
"One line description of the topic of the message", | |
}, | |
discourse_post_id: { | |
type: "string", | |
description: | |
"A unique identifier for the message within the conversation", | |
}, | |
}, | |
required: [ | |
"title", | |
"extracted_keywords", | |
"topic", | |
"discourse_post_id", | |
], | |
}, | |
}, | |
}, | |
required: ["topic_extractions"], | |
}, | |
}, | |
], | |
function_call: "auto", | |
} | |
const response = await io.openai.createChatCompletion( | |
`chat-completion-${topicName}`, | |
openaiInput | |
) | |
return response | |
} | |
async function openaiTopicHierachicalRepresentation(io, topic_cluster) { | |
const updatedTreeTopicCluster = { ...topic_cluster } // Creating a shallow copy to avoid direct mutation | |
for (const tree_topic of updatedTreeTopicCluster.tree) { | |
const topic_list = tree_topic.Topics | |
const topic_name_list = topic_list.map((topic_id) => { | |
// find the topic name from the topic id | |
updatedTreeTopicCluster.topic.find((topic) => topic.Topic === Number(topic_id))?.Name | |
}) | |
const topic_name_list_string = topic_name_list.join("\n") | |
// Simple name merge for now but could use representive documents and keywords in the future | |
const openaiPrompt = ` | |
I have to merge the following topics: | |
Topics: | |
${topic_name_list_string} | |
Based on the information above, extract a short merge topic label in the following format: | |
{ | |
topic: <topic label> | |
} | |
` | |
// @ts-ignore | |
const openaiInput = { | |
model: "gpt-3.5-turbo", | |
messages: [ | |
{ | |
role: "user", | |
content: openaiPrompt, | |
}, | |
], | |
functions: [ | |
{ | |
name: "TopicExtractionMessages", | |
description: "Exctract Topic of Documents", | |
parameters: { | |
type: "object", | |
properties: { | |
topic: { | |
type: "string", | |
description: | |
"One line description of the topic of the documents", | |
}, | |
}, | |
required: ["topic"], | |
}, | |
}, | |
], | |
function_call: "auto", | |
} | |
try { | |
const response = await io.openai.createChatCompletion( | |
`chat-completion-${tree_topic.Parent_Name}`, | |
openaiInput | |
) | |
// Assuming the response contains the new topic name directly | |
const responseData = response?.choices?.[0] | |
if (responseData.finish_reason === "function_call") { | |
if (responseData?.message?.function_call?.arguments) { | |
const functionArgs = JSON.parse( | |
responseData?.message?.function_call?.arguments | |
) | |
console.log("functionArgs:", functionArgs) | |
const newTopicName = functionArgs.topic | |
// Update the topic name | |
tree_topic.Parent_Name = newTopicName | |
} | |
} else { | |
io.logger.warn(`${tree_topic.Parent_Name} FUNCTION NOT BEING CALLED:`, responseData) | |
} | |
} catch (error) { | |
await io.logger.error(`Error updating topic ${tree_topic.Parent_Name}:`, error) | |
console.error(`Error updating topic ${tree_topic.Parent_Name}:`, error) | |
} | |
} | |
return updatedTreeTopicCluster | |
} | |
async function openaiTopicRepresentation(io, topic_cluster) { | |
const updatedTopicCluster = { ...topic_cluster } // Creating a shallow copy to avoid direct mutation | |
for (const topic of updatedTopicCluster.topic) { | |
const representativeDocuments = topic.Representative_Docs.join("\n ") | |
const representativeKeywords = topic.Representation.join(", ") | |
const openaiPrompt = ` | |
I have a topic that contains the following documents: | |
${representativeDocuments} | |
The topic is described by the following keywords: | |
${representativeKeywords} | |
Based on the information above, extract a short topic label in the following format: | |
{ | |
topic: <topic label> | |
} | |
` | |
// @ts-ignore | |
const openaiInput = { | |
model: "gpt-3.5-turbo", | |
messages: [ | |
{ | |
role: "user", | |
content: openaiPrompt, | |
}, | |
], | |
functions: [ | |
{ | |
name: "TopicExtractionMessages", | |
description: "Exctract Topic of Documents", | |
parameters: { | |
type: "object", | |
properties: { | |
topic: { | |
type: "string", | |
description: | |
"One line description of the topic of the documents", | |
}, | |
}, | |
required: ["topic"], | |
}, | |
}, | |
], | |
function_call: "auto", | |
} | |
try { | |
const response = await io.openai.createChatCompletion( | |
`chat-completion-${topic.Name}`, | |
openaiInput | |
) | |
// Assuming the response contains the new topic name directly | |
const responseData = response?.choices?.[0] | |
if (responseData.finish_reason === "function_call") { | |
if (responseData?.message?.function_call?.arguments) { | |
const functionArgs = JSON.parse( | |
responseData?.message?.function_call?.arguments | |
) | |
console.log("functionArgs:", functionArgs) | |
const newTopicName = functionArgs.topic | |
// Update the topic name | |
topic.Name = newTopicName | |
} | |
} else { | |
io.logger.warn(`${topic.Name} FUNCTION NOT BEING CALLED:`, responseData) | |
} | |
} catch (error) { | |
await io.logger.error(`Error updating topic ${topic.Name}:`, error) | |
console.error(`Error updating topic ${topic.Name}:`, error) | |
} | |
} | |
return updatedTopicCluster | |
} | |
async function writeIntentToNeo4j(topic, orgId) { | |
// Add to cypher | |
const cypher = ` | |
MERGE (m:Message_topic { | |
title: $title, | |
extracted_keywords: $extracted_keywords, | |
topic: $topic, | |
discourse_post_id: $discourse_post_id | |
}) | |
` | |
await write(cypher, { ...topic, orgId }) | |
} | |
async function openaiEmbed(io, topic_titles) { | |
const embeddingResponse = await io.openai.createEmbedding( | |
`embedding-topic-titles`, | |
{ | |
model: "text-embedding-ada-002", | |
input: topic_titles, | |
} | |
) | |
// console.log("embeddingResponse:", embeddingResponse) | |
return embeddingResponse.data | |
} | |
async function upsertEmbedding(io, discourse_messages, embeddingVector, orgId) { | |
// for each extracted keyword, upsert the embedding vector | |
const upsertData = discourse_messages.map((post, index) => ({ | |
org_id: orgId, | |
keyword: null, | |
embedding_vector: embeddingVector[index].embedding, | |
topic_title: post.topic_title, | |
topic: post.topic_title, | |
discourse_post_id: post.discourse_post_id, | |
base_discourse_url: post.base_discourse_url, | |
})) | |
const { error } = await io.supabase.runTask( | |
`upsert-topic-embedding`, | |
async (db) => { | |
return db.from("discourse_topic_embeddings").upsert(upsertData, { | |
onConflict: "discourse_post_id, base_discourse_url", | |
}) | |
} | |
) | |
if (error) { | |
await io.logger.error("Failed to upsert topic extraction:", error) | |
} | |
} | |
async function topicClusteringFetch(io, questions, embeddingVector) { | |
const jsonResponse = await io.runTask( | |
`topic-clustering-endpoint`, | |
async () => { | |
const response = await fetch( | |
"https://my-fastapi-app-ueugow36ea-uc.a.run.app/topic_clustering", | |
{ | |
method: "POST", | |
headers: { | |
"Content-Type": "application/json", | |
accept: "application/json", | |
}, | |
body: JSON.stringify({ | |
topic_messages: questions, | |
embeddings: embeddingVector, | |
}), | |
} | |
) | |
return response.json() | |
} | |
) | |
return jsonResponse | |
} | |
async function topicClustering(io, questions, embeddingVector) { | |
// console.log("topic_titles:", questions.slice(0, 5)) | |
// console.log("embeddingVector:", embeddingVector.slice(0, 5)) | |
const response = await io.backgroundFetch( | |
"topic-clustering-endpoint", | |
"https://my-fastapi-app-ueugow36ea-uc.a.run.app/topic_clustering", | |
{ | |
method: "POST", | |
headers: { | |
"Content-Type": "application/json", | |
// Authorization: redactString`Bearer ${auth.apiKey}`, | |
}, | |
body: JSON.stringify({ | |
topic_messages: questions, | |
embeddings: embeddingVector, | |
}), | |
}, | |
{ | |
"429": { | |
strategy: "backoff", | |
limit: 10, | |
minTimeoutInMs: 1000, | |
maxTimeoutInMs: 60000, | |
factor: 2, | |
randomize: true, | |
}, | |
} | |
) | |
return response | |
} | |
async function writeDiscourseQuestionClusteringNeo4j( | |
topic_clustering_dict, | |
orgId | |
) { | |
const cypherCreateTopics = ` | |
UNWIND $topics AS topic | |
WITH topic | |
CREATE (t:Topic { | |
id: topic.Topic, | |
count: topic.Count, | |
name: topic.Name, | |
representation: topic.Representation, | |
representative_docs: topic.Representative_Docs, | |
orgId: $orgId | |
}) | |
` | |
await write(cypherCreateTopics, { | |
topics: topic_clustering_dict.topic, | |
orgId, | |
}) | |
const cypherCreateDocuments = ` | |
UNWIND $documents AS doc | |
MATCH (t:Topic {id: doc.Topic, orgId: $orgId}) | |
CREATE (d:Document { | |
name: doc.Name, | |
topic_id: doc.Topic, | |
document: doc.Document, | |
representation: doc.Representation, | |
representative_docs: doc.Representative_Docs, | |
top_n_words: doc.Top_n_words, | |
probability: doc.Probability, | |
representative_document: doc.Representative_document, | |
relevance_score: doc.relevance_score, | |
orgId: $orgId | |
}) | |
CREATE (t)-[:HAS_DOCUMENT]->(d) | |
` | |
await write(cypherCreateDocuments, { | |
documents: topic_clustering_dict.document, | |
orgId, | |
}) | |
const cypherCreateHierarchicalTopics = ` | |
UNWIND $tree AS item | |
MERGE (parent:Hierarchical_Topic {id: item.Parent_ID, name: item.Parent_Name, orgId: $orgId}) | |
WITH item, parent | |
UNWIND item.Topics AS topicId | |
WITH item, parent, topicId | |
MATCH (topic:Topic {id: topicId, orgId: $orgId}) | |
MERGE (parent)-[:INCLUDES_TOPIC]->(topic) | |
WITH item, parent | |
MERGE (leftChild:Hierarchical_Topic {id: item.Child_Left_ID, orgId: $orgId}) | |
MERGE (rightChild:Hierarchical_Topic {id: item.Child_Right_ID, orgId: $orgId}) | |
MERGE (parent)-[:HAS_CHILD {distance: item.Distance}]->(leftChild) | |
MERGE (parent)-[:HAS_CHILD {distance: item.Distance}]->(rightChild) | |
` | |
await write(cypherCreateHierarchicalTopics, { | |
tree: topic_clustering_dict.tree, | |
orgId, | |
}) | |
} | |
async function rerankDocuments(io, topic_cluster) { | |
const documents = topic_cluster.document | |
const groupedByTopic = _.groupBy(documents, "Topic") | |
const updatedDocuments: any = [] | |
for (const [topic, docs] of Object.entries(groupedByTopic)) { | |
const topic_name = topic_cluster.topic.find( | |
(t) => t.Topic === Number(topic) | |
)?.Name | |
if (!topic_name) { | |
await io.logger.warn(`Topic name not found for topic: ${topic}`) | |
await io.logger.warn(`Skipping topic: ${topic}`, topic_cluster) | |
continue | |
} | |
// const cleanedName = docs[0]["Name"].replace(/^\d+_/, "") | |
const query = `Does the question relate well with the topic: ${topic_name}` | |
const topic_documents = docs.map((doc) => doc.Document) | |
await io.logger.info(`Query-${topic_name}:`, query) | |
await io.logger.info(`topic_documents-${topic_name}:`, topic_documents) | |
try { | |
const jsonResponse = await io.runTask( | |
`rerank-cohere-${topic_name}`, | |
async () => { | |
const response = await fetch("https://api.cohere.ai/v1/rerank", { | |
method: "POST", | |
headers: { | |
"Content-Type": "application/json", | |
Authorization: `Bearer ${COHERE_API_KEY}`, | |
accept: "application/json", | |
}, | |
body: JSON.stringify({ | |
return_documents: false, | |
max_chunks_per_doc: 10, | |
model: "rerank-english-v2.0", | |
query: query, | |
documents: topic_documents, | |
}), | |
}) | |
return response.json() | |
} | |
) | |
if (jsonResponse && jsonResponse.results) { | |
for (const result of jsonResponse.results) { | |
const { index, relevance_score } = result | |
if (docs[index]) { | |
docs[index].relevance_score = relevance_score | |
docs[index].Name = topic_name //update topic_name | |
} | |
} | |
} | |
} catch (error) { | |
await io.logger.error( | |
`Error during rerank operation: ${topic_name}`, | |
error | |
) | |
console.error("Error during rerank operation:", error) | |
} | |
updatedDocuments.push(...docs) | |
} | |
return updatedDocuments | |
} | |
const COHERE_API_KEY = process.env.COHERE_API_KEY | |
const supabase = new Supabase({ | |
id: "supabase", | |
supabaseUrl: process.env.NEXT_PUBLIC_SUPABASE_URL!, | |
supabaseKey: process.env.SUPABASE_SERVICE_ROLE_KEY!, | |
}) | |
const openai = new OpenAI({ | |
id: "openai", | |
apiKey: process.env.OPENAI_API_KEY!, | |
}) | |
// Define the Trigger.dev job | |
client.defineJob({ | |
id: "discourse-question-clustering", | |
name: "Discourse Question clustering", | |
version: "0.0.1", | |
trigger: eventTrigger({ | |
name: "discourse.question.clustering", | |
schema: z.object({ | |
// message_id: z.string(), | |
org_id: z.string(), | |
}), | |
}), | |
integrations: { | |
supabase, | |
//@ts-ignore | |
openai, | |
}, | |
run: async (payload, io) => { | |
try { | |
// Fetch discord message from Supabase | |
const orgId = payload.org_id | |
//@ts-ignore | |
// const discourseMessages = await fetchDiscourseMessages(io, orgId) | |
const discourseMessages = await fetchDiscourseQuestionEmbeddings( | |
io, | |
orgId | |
) | |
// // Group the messages by channel_id using lodash | |
// const extractedData = await groupExtractMessagesByTopics( | |
// discourseMessages | |
// ) // Extracted logic | |
await io.logger.info("Extracted Data:", discourseMessages) | |
// console.log("extractedData:", JSON.stringify(discourseMessages)) | |
// io.logger.info("extractedData:", extractedData); | |
// Now, extractedData is an object where each key is a channel_id, and the value contains channel_name and an array of extracted messages for that channel. | |
// const embedding = await openaiEmbed( | |
// io, | |
// discourseMessages.map((msg) => msg.topic_title) | |
// ) | |
// await upsertEmbedding(io, discourseMessages, embedding, orgId) | |
// fetch toipc clustering | |
// const clustering_result = await topicClustering( | |
// io, | |
// discourseMessages.map((msg) => msg.question), | |
// discourseMessages.map((msg) => JSON.parse(msg.embedding_vector)) | |
// ) | |
const clustering_result = await topicClusteringFetch( | |
io, | |
discourseMessages.map((msg) => msg.question), | |
discourseMessages.map((msg) => JSON.parse(msg.embedding_vector)) | |
) | |
await io.logger.info("Question Clustering:", clustering_result) | |
// Openai transform the topic names | |
const updatedTopicCluster = await openaiTopicRepresentation( | |
io, | |
clustering_result | |
) | |
// Topic clustering | |
// console.log("clustering_result:", updatedTopicCluster.topic) | |
const updatedDocuments = await rerankDocuments(io, clustering_result) | |
await io.logger.info("Updated Documents:", updatedDocuments) | |
console.log("updatedDocuments:", updatedDocuments) | |
//update the clustering result | |
clustering_result.document = updatedDocuments | |
// Openai transform the hierarchical topic names | |
const updatedHierarchicalTopicCluster = | |
await openaiTopicHierachicalRepresentation(io, clustering_result) | |
await io.logger.info("Updated Hierarchical Topic Cluster:", updatedHierarchicalTopicCluster) | |
console.log("updatedHierarchicalTopicCluster:", updatedHierarchicalTopicCluster) | |
io.logger.info("Writing to Neo4j") | |
await writeDiscourseQuestionClusteringNeo4j(updatedHierarchicalTopicCluster, orgId) | |
io.logger.info("Finished writing to Neo4j") | |
return { | |
success: true as const, | |
} | |
} catch (error) { | |
return { | |
success: false as const, | |
error, | |
} | |
} | |
}, | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment