Created
March 19, 2025 12:08
-
-
Save ghandic/020ceb2e48c1378b274e048635e12460 to your computer and use it in GitHub Desktop.
How you can use parallel agents in langgraph JS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { BedrockChat } from "@langchain/community/chat_models/bedrock"; | |
import { StateGraph, END, START } from "@langchain/langgraph"; | |
import { | |
ChatPromptTemplate, | |
MessagesPlaceholder, | |
} from "@langchain/core/prompts"; | |
import { HumanMessage, AIMessage, ToolMessage } from "@langchain/core/messages"; | |
import { z } from "zod"; | |
import { MessagesAnnotation, Annotation } from "@langchain/langgraph"; | |
import { tool } from "@langchain/core/tools"; | |
// Define tools | |
const weatherTool = tool( | |
async (input) => { | |
const { city } = weatherTool.schema.parse(input); | |
try { | |
const res = await fetch( | |
`https://api.open-meteo.com/v1/forecast?latitude=${city.latitude}&longitude=${city.longitude}¤t_weather=true`, | |
); | |
if (!res.ok) { | |
throw new Error(`Failed to fetch weather data: ${res.statusText}`); | |
} | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | |
const response = await res.json(); | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access | |
const tempCelsius = response.current_weather.temperature; | |
const tempFahrenheit = (tempCelsius * 9) / 5 + 32; | |
return Math.round(tempFahrenheit).toString(); | |
} catch (error) { | |
console.error("Error fetching weather:", error); | |
return "Error fetching weather data"; | |
} | |
}, | |
{ | |
name: "Get_Weather", | |
description: "Get the current temperature from a city, in Fahrenheit", | |
schema: z.object({ | |
city: z.object({ | |
latitude: z.number().describe("Latitude of the city"), | |
longitude: z.number().describe("Longitude of the city"), | |
}), | |
}), | |
}, | |
); | |
const differenceTool = tool( | |
async (input) => { | |
const { minuend, subtrahend } = differenceTool.schema.parse(input); | |
return (minuend - subtrahend).toString(); | |
}, | |
{ | |
name: "Difference", | |
description: "Get the difference between two numbers", | |
schema: z.object({ | |
minuend: z | |
.number() | |
.describe("The number from which another number is to be subtracted"), | |
subtrahend: z.number().describe("The number to be subtracted"), | |
}), | |
}, | |
); | |
const responseTool = tool( | |
async (input) => { | |
return JSON.stringify(responseTool.schema.parse(input)); | |
}, | |
{ | |
name: "Response", | |
description: "Final answer to the user", | |
schema: z.object({ | |
warmest_city: z | |
.string() | |
.describe("The warmest city and its current temperature"), | |
explanation: z | |
.string() | |
.describe( | |
"How much warmer it is in the warmest city than the other cities", | |
), | |
}), | |
}, | |
); | |
const tools = [weatherTool, differenceTool, responseTool]; | |
// Set up the model | |
const model = new BedrockChat({ | |
model: "anthropic.claude-3-sonnet-20240229-v1:0", | |
region: "us-east-1", | |
maxTokens: 1000, | |
}).bindTools(tools); | |
const AgentStateAnnotation = Annotation.Root({ | |
// Spread `MessagesAnnotation` into the state to add the `messages` field. | |
...MessagesAnnotation.spec, | |
}); | |
type AgentState = typeof AgentStateAnnotation.State; | |
// Create the system prompt - sequential | |
// const prompt = ChatPromptTemplate.fromMessages([ | |
// ["system", `You are a helpful assistant.`], | |
// new MessagesPlaceholder("messages"), | |
// ]); | |
// Create the system prompt - parallel | |
const prompt = ChatPromptTemplate.fromMessages([ | |
[ | |
"system", | |
`You are a helpful assistant. When appropriate, you can call multiple tools in a single response to be more efficient. For example, in tasks comparing data from multiple sources, try to request all the data in parallel rather than one at a time.`, | |
], | |
new MessagesPlaceholder("messages"), | |
]); | |
const callModel = async (state: AgentState): Promise<AgentState> => { | |
const response = await prompt.pipe(model).invoke({ | |
messages: state.messages, | |
}); | |
return { | |
messages: [...state.messages, response], | |
}; | |
}; | |
const callTool = async (state: AgentState): Promise<AgentState> => { | |
const messages = state.messages; | |
const lastMessage = messages[messages.length - 1]; | |
const newMessages = [...messages]; | |
// Check if the last message has tool calls | |
if (lastMessage instanceof AIMessage && lastMessage.tool_calls) { | |
// Process all tool calls in parallel | |
const toolCallPromises = lastMessage.tool_calls.map(async (toolCall) => { | |
const toolName = toolCall.name; | |
let toolInput; | |
try { | |
toolInput = toolCall.args; | |
} catch (e) { | |
console.error("Failed to parse tool arguments:", e); | |
return null; | |
} | |
try { | |
// Invoke the tool directly | |
const tool = tools.find((t) => t.name === toolName); | |
if (!tool) { | |
console.error(`Tool ${toolName} not found`); | |
return null; | |
} | |
// @ts-expect-error Allow | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment | |
const response = await tool.invoke(toolInput); | |
// Create a ToolMessage with the result | |
return new ToolMessage({ | |
content: String(response), | |
name: toolName, | |
tool_call_id: toolCall.id!, | |
}); | |
} catch (error) { | |
console.error(`Error invoking tool ${toolName}:`, error); | |
return null; | |
} | |
}); | |
// Wait for all tool executions to complete | |
const toolResults = await Promise.all(toolCallPromises); | |
// Add all tool messages to the conversation | |
toolResults.filter(Boolean).forEach((msg) => { | |
if (msg) newMessages.push(msg); | |
}); | |
} | |
return { | |
messages: newMessages, | |
}; | |
}; | |
// Function to determine whether to continue or finish | |
const shouldContinue = (state: AgentState): "tools" | typeof END => { | |
const lastMessage = state.messages[state.messages.length - 1]; | |
// If there are no tool calls, we finish | |
if (!(lastMessage instanceof AIMessage) || !lastMessage.tool_calls) { | |
return END; | |
} | |
// If there is a Response tool call, we finish | |
if ( | |
lastMessage.tool_calls?.some((toolCall) => toolCall.name === "Response") | |
) { | |
console.log("Response tool call found, finishing"); | |
return END; | |
} | |
// Otherwise, we continue | |
return "tools"; | |
}; | |
// Define and build the graph | |
const createWeatherComparisonAgent = () => { | |
const graph = new StateGraph({ | |
stateSchema: AgentStateAnnotation, | |
}) | |
.addNode("agent", callModel) | |
.addNode("tools", callTool) | |
.addEdge(START, "agent") | |
.addConditionalEdges("agent", shouldContinue, ["tools", END]) | |
.addEdge("tools", "agent"); | |
// Compile the graph | |
return graph.compile(); | |
}; | |
// Example usage | |
const runWeatherComparison = async () => { | |
const agent = createWeatherComparisonAgent(); | |
const initialState: AgentState = { | |
messages: [ | |
new HumanMessage({ | |
content: | |
"Where is it warmest: Austin, Texas; Tokyo; or Seattle? And by how much is it warmer than the other cities?", | |
}), | |
], | |
}; | |
console.log("Starting weather comparison..."); | |
console.log("Initial query:", initialState.messages[0]!.content); | |
try { | |
// For streaming results | |
for await (const output of await agent.stream(initialState)) { | |
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument | |
for (const [nodeName, nodeOutput] of Object.entries(output)) { | |
console.log(`\nOutput from node '${nodeName}':`); | |
console.log("---"); | |
const state = nodeOutput as AgentState; | |
console.log(`Messages: ${state.messages.length}`); | |
// Print the last message for context | |
const lastMsg = state.messages[state.messages.length - 1]; | |
if (lastMsg instanceof AIMessage) { | |
console.log( | |
"Last message (AI):", | |
lastMsg.tool_calls | |
? `Using tools: ${lastMsg.tool_calls.map((t) => t.name).join(", ")}` | |
: lastMsg.content, | |
); | |
// Check if the last message is a Response tool call | |
if (lastMsg.tool_calls?.some((tc) => tc.name === "Response")) { | |
const responseCall = lastMsg.tool_calls.find( | |
(call) => call.name === "Response", | |
); | |
if (responseCall) { | |
console.log("\nFINAL ANSWER:"); | |
console.log( | |
`The warmest city is: ${responseCall.args.warmest_city}`, | |
); | |
console.log(`Explanation: ${responseCall.args.explanation}`); | |
} | |
} | |
} else if (lastMsg instanceof ToolMessage) { | |
console.log(`Last message (Tool ${lastMsg.name}):`, lastMsg.content); | |
} | |
console.log("---"); | |
} | |
} | |
} catch (error) { | |
console.error("Error executing agent:", error); | |
} | |
}; | |
const startTime = Date.now(); | |
// Run the example | |
await runWeatherComparison(); | |
await runWeatherComparison(); | |
await runWeatherComparison(); | |
await runWeatherComparison(); | |
await runWeatherComparison(); | |
const endTime = Date.now(); | |
console.log(`Time taken: ${(endTime - startTime)/5}ms`); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment