import { BaseMessage } from "@langchain/core/messages";import { MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z.array(z.custom<BaseMessage>()).register(registry, MessagesZodMeta), extraField: z.number(),});
import { MessagesZodMeta } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ messages: z .array(z.custom<BaseMessage>()) .register(registry, MessagesZodMeta), extraField: z.number(),});
import { StateGraph, START, END } from "@langchain/langgraph";import * as z from "zod";// Define the schema for the inputconst InputState = z.object({ question: z.string(),});// Define the schema for the outputconst OutputState = z.object({ answer: z.string(),});// Define the overall schema, combining both input and outputconst OverallState = InputState.merge(OutputState);// Build the graph with input and output schemas specifiedconst graph = new StateGraph({ input: InputState, output: OutputState, state: OverallState,}) .addNode("answerNode", (state) => { // Example answer and an extra key return { answer: "bye", question: state.question }; }) .addEdge(START, "answerNode") .addEdge("answerNode", END) .compile();// Invoke the graph with an input and print the resultconsole.log(await graph.invoke({ question: "hi" }));
import { StateGraph, START, END } from "@langchain/langgraph";import * as z from "zod";// The overall state of the graph (this is the public state shared across nodes)const OverallState = z.object({ a: z.string(),});// Output from node1 contains private data that is not part of the overall stateconst Node1Output = z.object({ privateData: z.string(),});// The private data is only shared between node1 and node2const node1 = (state: z.infer<typeof OverallState>): z.infer<typeof Node1Output> => { const output = { privateData: "set by node1" }; console.log(`Entered node 'node1':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Node 2 input only requests the private data available after node1const Node2Input = z.object({ privateData: z.string(),});const node2 = (state: z.infer<typeof Node2Input>): z.infer<typeof OverallState> => { const output = { a: "set by node2" }; console.log(`Entered node 'node2':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Node 3 only has access to the overall state (no access to private data from node1)const node3 = (state: z.infer<typeof OverallState>): z.infer<typeof OverallState> => { const output = { a: "set by node3" }; console.log(`Entered node 'node3':\n\tInput: ${JSON.stringify(state)}.\n\tReturned: ${JSON.stringify(output)}`); return output;};// Connect nodes in a sequence// node2 accepts private data from node1, whereas// node3 does not see the private data.const graph = new StateGraph({ state: OverallState, nodes: { node1: { action: node1, output: Node1Output }, node2: { action: node2, input: Node2Input }, node3: { action: node3 }, }}) .addEdge(START, "node1") .addEdge("node1", "node2") .addEdge("node2", "node3") .addEdge("node3", END) .compile();// Invoke the graph with the initial stateconst response = await graph.invoke({ a: "set at start" });console.log(`\nOutput of graph invocation: ${JSON.stringify(response)}`);
复制
向 AI 提问
Entered node 'node1': ut: {"a":"set at start"}. urned: {"privateData":"set by node1"}Entered node 'node2': ut: {"privateData":"set by node1"}. urned: {"a":"set by node2"}Entered node 'node3': ut: {"a":"set by node2"}. urned: {"a":"set by node3"}Output of graph invocation: {"a":"set by node3"}
LangGraph 在超级步骤内执行节点,这意味着虽然并行分支并行执行,但整个超级步骤是事务性的。如果其中任何一个分支引发异常,则没有更新应用于状态(整个超级步骤出错)。重要的是,当使用检查点时,超级步骤内成功节点的結果会保存下来,并且在恢复时不会重复。如果您有容易出错的代码(可能希望处理不稳定的 API 调用),LangGraph 提供了两种方法来解决此问题:
LangGraph 支持使用 Send API 进行 Map-Reduce 和其他高级分支模式。以下是如何使用它的示例
复制
向 AI 提问
import { StateGraph, START, END, Send } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const OverallState = z.object({ topic: z.string(), subjects: z.array(z.string()), jokes: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, }), bestSelectedJoke: z.string(),});const generateTopics = (state: z.infer<typeof OverallState>) => { return { subjects: ["lions", "elephants", "penguins"] };};const generateJoke = (state: { subject: string }) => { const jokeMap: Record<string, string> = { lions: "Why don't lions like fast food? Because they can't catch it!", elephants: "Why don't elephants use computers? They're afraid of the mouse!", penguins: "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." }; return { jokes: [jokeMap[state.subject]] };};const continueToJokes = (state: z.infer<typeof OverallState>) => { return state.subjects.map((subject) => new Send("generateJoke", { subject }));};const bestJoke = (state: z.infer<typeof OverallState>) => { return { bestSelectedJoke: "penguins" };};const graph = new StateGraph(OverallState) .addNode("generateTopics", generateTopics) .addNode("generateJoke", generateJoke) .addNode("bestJoke", bestJoke) .addEdge(START, "generateTopics") .addConditionalEdges("generateTopics", continueToJokes) .addEdge("generateJoke", "bestJoke") .addEdge("bestJoke", END) .compile();
复制
向 AI 提问
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
复制
向 AI 提问
// Call the graph: here we call it to generate a list of jokesfor await (const step of await graph.stream({ topic: "animals" })) { console.log(step);}
复制
向 AI 提问
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }{ bestJoke: { bestSelectedJoke: 'penguins' } }
import { StateGraph, START, END } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // The reducer makes this append-only aggregate: z.array(z.string()).register(registry, { reducer: { fn: (x, y) => x.concat(y), }, default: () => [] as string[], }),});const nodeA = (state: z.infer<typeof State>) => { console.log(`Node A sees ${state.aggregate}`); return { aggregate: ["A"] };};const nodeB = (state: z.infer<typeof State>) => { console.log(`Node B sees ${state.aggregate}`); return { aggregate: ["B"] };};// Define edgesconst route = (state: z.infer<typeof State>): "b" | typeof END => { if (state.aggregate.length < 7) { return "b"; } else { return END; }};const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addEdge(START, "a") .addConditionalEdges("a", route) .addEdge("b", "a") .compile();
复制
向 AI 提问
import * as fs from "node:fs/promises";const drawableGraph = await graph.getGraphAsync();const image = await drawableGraph.drawMermaidPng();const imageBuffer = new Uint8Array(await image.arrayBuffer());await fs.writeFile("graph.png", imageBuffer);
这种架构类似于ReAct 代理,其中节点 "a" 是工具调用模型,节点 "b" 代表工具。在我们的 route 条件边中,我们指定在状态中的 "aggregate" 列表超过阈值长度后应该结束。调用图,我们看到我们在节点 "a" 和 "b" 之间交替,直到达到终止条件后终止。
复制
向 AI 提问
const result = await graph.invoke({ aggregate: [] });console.log(result);
复制
向 AI 提问
Node A sees []Node B sees ['A']Node A sees ['A', 'B']Node B sees ['A', 'B', 'A']Node A sees ['A', 'B', 'A', 'B']Node B sees ['A', 'B', 'A', 'B', 'A']Node A sees ['A', 'B', 'A', 'B', 'A', 'B']{ aggregate: ['A', 'B', 'A', 'B', 'A', 'B', 'A'] }
import { Command } from "@langchain/langgraph";const myNode = (state: State): Command => { return new Command({ // state update update: { foo: "bar" }, // control flow goto: "myOtherNode" });};
下面我们展示一个端到端的例子。让我们创建一个包含 3 个节点的简单图:A、B 和 C。我们将首先执行节点 A,然后根据节点 A 的输出决定下一步是转到节点 B 还是节点 C。
复制
向 AI 提问
import { StateGraph, START, Command } from "@langchain/langgraph";import * as z from "zod";// Define graph stateconst State = z.object({ foo: z.string(),});// Define the nodesconst nodeA = (state: z.infer<typeof State>): Command => { console.log("Called A"); const value = Math.random() > 0.5 ? "b" : "c"; // this is a replacement for a conditional edge function const goto = value === "b" ? "nodeB" : "nodeC"; // note how Command allows you to BOTH update the graph state AND route to the next node return new Command({ // this is the state update update: { foo: value }, // this is a replacement for an edge goto, });};const nodeB = (state: z.infer<typeof State>) => { console.log("Called B"); return { foo: state.foo + "b" };};const nodeC = (state: z.infer<typeof State>) => { console.log("Called C"); return { foo: state.foo + "c" };};
import { StateGraph, START, Command } from "@langchain/langgraph";import { registry } from "@langchain/langgraph/zod";import * as z from "zod";const State = z.object({ // NOTE: we define a reducer here foo: z.string().register(registry, { reducer: { fn: (x, y) => x + y, }, }),});const nodeA = (state: z.infer<typeof State>) => { console.log("Called A"); const value = Math.random() > 0.5 ? "nodeB" : "nodeC"; // note how Command allows you to BOTH update the graph state AND route to the next node return new Command({ update: { foo: "a" }, goto: value, // this tells LangGraph to navigate to nodeB or nodeC in the parent graph // NOTE: this will navigate to the closest parent graph relative to the subgraph graph: Command.PARENT, });};const subgraph = new StateGraph(State) .addNode("nodeA", nodeA, { ends: ["nodeB", "nodeC"] }) .addEdge(START, "nodeA") .compile();const nodeB = (state: z.infer<typeof State>) => { console.log("Called B"); // NOTE: since we've defined a reducer, we don't need to manually append // new characters to existing 'foo' value. instead, reducer will append these // automatically return { foo: "b" };}; const nodeC = (state: z.infer<typeof State>) => { console.log("Called C"); return { foo: "c" };};const graph = new StateGraph(State) .addNode("subgraph", subgraph, { ends: ["nodeB", "nodeC"] }) .addNode("nodeB", nodeB) .addNode("nodeC", nodeC) .addEdge(START, "subgraph") .compile();
复制
向 AI 提问
const result = await graph.invoke({ foo: "" });console.log(result);
一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})
复制
向 AI 提问
import { tool } from "@langchain/core/tools";import { Command } from "@langchain/langgraph";import * as z from "zod";const lookupUserInfo = tool( async (input, config) => { const userId = config.configurable?.userId; const userInfo = getUserInfo(userId); return new Command({ update: { // update the state keys userInfo: userInfo, // update the message history messages: [{ role: "tool", content: "Successfully looked up user information", tool_call_id: config.toolCall.id }] } }); }, { name: "lookupUserInfo", description: "Use this to look up user information to better assist them with their questions.", schema: z.object({}), });