Skip to content

Instantly share code, notes, and snippets.

@ghandic
Created March 19, 2025 12:08
Show Gist options
  • Save ghandic/020ceb2e48c1378b274e048635e12460 to your computer and use it in GitHub Desktop.
Save ghandic/020ceb2e48c1378b274e048635e12460 to your computer and use it in GitHub Desktop.
How you can use parallel agents in langgraph JS
import { BedrockChat } from "@langchain/community/chat_models/bedrock";
import { StateGraph, END, START } from "@langchain/langgraph";
import {
ChatPromptTemplate,
MessagesPlaceholder,
} from "@langchain/core/prompts";
import { HumanMessage, AIMessage, ToolMessage } from "@langchain/core/messages";
import { z } from "zod";
import { MessagesAnnotation, Annotation } from "@langchain/langgraph";
import { tool } from "@langchain/core/tools";
// Define tools
const weatherTool = tool(
async (input) => {
const { city } = weatherTool.schema.parse(input);
try {
const res = await fetch(
`https://api.open-meteo.com/v1/forecast?latitude=${city.latitude}&longitude=${city.longitude}&current_weather=true`,
);
if (!res.ok) {
throw new Error(`Failed to fetch weather data: ${res.statusText}`);
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const response = await res.json();
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access
const tempCelsius = response.current_weather.temperature;
const tempFahrenheit = (tempCelsius * 9) / 5 + 32;
return Math.round(tempFahrenheit).toString();
} catch (error) {
console.error("Error fetching weather:", error);
return "Error fetching weather data";
}
},
{
name: "Get_Weather",
description: "Get the current temperature from a city, in Fahrenheit",
schema: z.object({
city: z.object({
latitude: z.number().describe("Latitude of the city"),
longitude: z.number().describe("Longitude of the city"),
}),
}),
},
);
const differenceTool = tool(
async (input) => {
const { minuend, subtrahend } = differenceTool.schema.parse(input);
return (minuend - subtrahend).toString();
},
{
name: "Difference",
description: "Get the difference between two numbers",
schema: z.object({
minuend: z
.number()
.describe("The number from which another number is to be subtracted"),
subtrahend: z.number().describe("The number to be subtracted"),
}),
},
);
const responseTool = tool(
async (input) => {
return JSON.stringify(responseTool.schema.parse(input));
},
{
name: "Response",
description: "Final answer to the user",
schema: z.object({
warmest_city: z
.string()
.describe("The warmest city and its current temperature"),
explanation: z
.string()
.describe(
"How much warmer it is in the warmest city than the other cities",
),
}),
},
);
const tools = [weatherTool, differenceTool, responseTool];
// Set up the model
const model = new BedrockChat({
model: "anthropic.claude-3-sonnet-20240229-v1:0",
region: "us-east-1",
maxTokens: 1000,
}).bindTools(tools);
const AgentStateAnnotation = Annotation.Root({
// Spread `MessagesAnnotation` into the state to add the `messages` field.
...MessagesAnnotation.spec,
});
type AgentState = typeof AgentStateAnnotation.State;
// Create the system prompt - sequential
// const prompt = ChatPromptTemplate.fromMessages([
// ["system", `You are a helpful assistant.`],
// new MessagesPlaceholder("messages"),
// ]);
// Create the system prompt - parallel
const prompt = ChatPromptTemplate.fromMessages([
[
"system",
`You are a helpful assistant. When appropriate, you can call multiple tools in a single response to be more efficient. For example, in tasks comparing data from multiple sources, try to request all the data in parallel rather than one at a time.`,
],
new MessagesPlaceholder("messages"),
]);
const callModel = async (state: AgentState): Promise<AgentState> => {
const response = await prompt.pipe(model).invoke({
messages: state.messages,
});
return {
messages: [...state.messages, response],
};
};
const callTool = async (state: AgentState): Promise<AgentState> => {
const messages = state.messages;
const lastMessage = messages[messages.length - 1];
const newMessages = [...messages];
// Check if the last message has tool calls
if (lastMessage instanceof AIMessage && lastMessage.tool_calls) {
// Process all tool calls in parallel
const toolCallPromises = lastMessage.tool_calls.map(async (toolCall) => {
const toolName = toolCall.name;
let toolInput;
try {
toolInput = toolCall.args;
} catch (e) {
console.error("Failed to parse tool arguments:", e);
return null;
}
try {
// Invoke the tool directly
const tool = tools.find((t) => t.name === toolName);
if (!tool) {
console.error(`Tool ${toolName} not found`);
return null;
}
// @ts-expect-error Allow
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const response = await tool.invoke(toolInput);
// Create a ToolMessage with the result
return new ToolMessage({
content: String(response),
name: toolName,
tool_call_id: toolCall.id!,
});
} catch (error) {
console.error(`Error invoking tool ${toolName}:`, error);
return null;
}
});
// Wait for all tool executions to complete
const toolResults = await Promise.all(toolCallPromises);
// Add all tool messages to the conversation
toolResults.filter(Boolean).forEach((msg) => {
if (msg) newMessages.push(msg);
});
}
return {
messages: newMessages,
};
};
// Function to determine whether to continue or finish
const shouldContinue = (state: AgentState): "tools" | typeof END => {
const lastMessage = state.messages[state.messages.length - 1];
// If there are no tool calls, we finish
if (!(lastMessage instanceof AIMessage) || !lastMessage.tool_calls) {
return END;
}
// If there is a Response tool call, we finish
if (
lastMessage.tool_calls?.some((toolCall) => toolCall.name === "Response")
) {
console.log("Response tool call found, finishing");
return END;
}
// Otherwise, we continue
return "tools";
};
// Define and build the graph
const createWeatherComparisonAgent = () => {
const graph = new StateGraph({
stateSchema: AgentStateAnnotation,
})
.addNode("agent", callModel)
.addNode("tools", callTool)
.addEdge(START, "agent")
.addConditionalEdges("agent", shouldContinue, ["tools", END])
.addEdge("tools", "agent");
// Compile the graph
return graph.compile();
};
// Example usage
const runWeatherComparison = async () => {
const agent = createWeatherComparisonAgent();
const initialState: AgentState = {
messages: [
new HumanMessage({
content:
"Where is it warmest: Austin, Texas; Tokyo; or Seattle? And by how much is it warmer than the other cities?",
}),
],
};
console.log("Starting weather comparison...");
console.log("Initial query:", initialState.messages[0]!.content);
try {
// For streaming results
for await (const output of await agent.stream(initialState)) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
for (const [nodeName, nodeOutput] of Object.entries(output)) {
console.log(`\nOutput from node '${nodeName}':`);
console.log("---");
const state = nodeOutput as AgentState;
console.log(`Messages: ${state.messages.length}`);
// Print the last message for context
const lastMsg = state.messages[state.messages.length - 1];
if (lastMsg instanceof AIMessage) {
console.log(
"Last message (AI):",
lastMsg.tool_calls
? `Using tools: ${lastMsg.tool_calls.map((t) => t.name).join(", ")}`
: lastMsg.content,
);
// Check if the last message is a Response tool call
if (lastMsg.tool_calls?.some((tc) => tc.name === "Response")) {
const responseCall = lastMsg.tool_calls.find(
(call) => call.name === "Response",
);
if (responseCall) {
console.log("\nFINAL ANSWER:");
console.log(
`The warmest city is: ${responseCall.args.warmest_city}`,
);
console.log(`Explanation: ${responseCall.args.explanation}`);
}
}
} else if (lastMsg instanceof ToolMessage) {
console.log(`Last message (Tool ${lastMsg.name}):`, lastMsg.content);
}
console.log("---");
}
}
} catch (error) {
console.error("Error executing agent:", error);
}
};
const startTime = Date.now();
// Run the example
await runWeatherComparison();
await runWeatherComparison();
await runWeatherComparison();
await runWeatherComparison();
await runWeatherComparison();
const endTime = Date.now();
console.log(`Time taken: ${(endTime - startTime)/5}ms`);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment