跳到主要内容
LangGraph 实现了一个流式传输系统来显示实时更新。流式传输对于增强基于 LLM 的应用程序的响应能力至关重要。通过逐步显示输出,甚至在完整响应准备好之前,流式传输显著改善了用户体验 (UX),尤其是在处理 LLM 的延迟时。 LangGraph 流式传输可以实现以下功能:

支持的流模式

将以下一个或多个流模式作为列表传递给 stream 方法
模式描述
在图的每个步骤之后流式传输状态的完整值。
更新在图的每个步骤后流式传输状态的更新。如果同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新会单独流式传输。
自定义从图节点内部流式传输自定义数据。
消息从调用 LLM 的任何图节点流式传输 2 元组(LLM 令牌,元数据)。
调试在图执行过程中尽可能多地流式传输信息。

基本用法示例

LangGraph 图公开了 stream 方法以将流式输出作为迭代器生成。
for await (const chunk of await graph.stream(inputs, {
  streamMode: "updates",
})) {
  console.log(chunk);
}
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();

for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  // Set streamMode: "updates" to stream only the updates to the graph state after each node
  // Other stream modes are also available. See supported stream modes for details
  { streamMode: "updates" }
)) {
  console.log(chunk);
}
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

流式传输多种模式

您可以将一个数组作为 streamMode 参数传递,以同时流式传输多种模式。 流式传输的输出将是 [mode, chunk] 的元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
  streamMode: ["updates", "custom"],
})) {
  console.log(chunk);
}

流式图状态

使用流模式 updatesvalues 在图执行时流式传输图的状态。
  • updates 在图的每个步骤后流式传输状态的更新
  • values 在图的每个步骤后流式传输状态的完整值
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();
  • 更新
使用此功能可仅流式传输每个步骤后由节点返回的状态更新。流式输出包括节点的名称以及更新。
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "updates" }
)) {
  console.log(chunk);
}

流式子图输出

要在流式输出中包含来自子图的输出,您可以在父图的 .stream() 方法中设置 subgraphs: true。这将同时流式传输来自父图和任何子图的输出。 输出将作为元组 [namespace, data] 流式传输,其中 namespace 是一个元组,其中包含调用子图的节点的路径,例如 ["parent_node:", "child_node:"]
for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    // Set subgraphs: true to stream outputs from subgraphs
    subgraphs: true,
    streamMode: "updates",
  }
)) {
  console.log(chunk);
}
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

// Define subgraph
const SubgraphState = z.object({
  foo: z.string(), // note that this key is shared with the parent graph state
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();

// Define parent graph
const ParentState = z.object({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");
const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    streamMode: "updates",
    // Set subgraphs: true to stream outputs from subgraphs
    subgraphs: true,
  }
)) {
  console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
注意,我们不仅接收节点更新,还接收命名空间,它告诉我们正在从哪个图(或子图)进行流式传输。

调试

使用 debug 流模式在图的整个执行过程中尽可能多地流式传输信息。流式输出包括节点的名称和完整状态。
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "debug" }
)) {
  console.log(chunk);
}

LLM Tokens

使用 messages 流式传输模式,逐令牌地从图的任何部分(包括节点、工具、子图或任务)流式传输大型语言模型 (LLM) 输出。 messages 模式的流式输出是一个元组 [message_chunk, metadata],其中:
  • message_chunk:来自 LLM 的令牌或消息段。
  • metadata:一个包含图节点和 LLM 调用详细信息的字典。
如果您的 LLM 不作为 LangChain 集成提供,您可以使用 custom 模式流式传输其输出。有关详细信息,请参阅与任何 LLM 配合使用
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

const MyState = z.object({
  topic: z.string(),
  joke: z.string().default(""),
});

const model = new ChatOpenAI({ model: "gpt-4o-mini" });

const callModel = async (state: z.infer<typeof MyState>) => {
  // Call the LLM to generate a joke about a topic
  // Note that message events are emitted even when the LLM is run using .invoke rather than .stream
  const modelResponse = await model.invoke([
    { role: "user", content: `Generate a joke about ${state.topic}` },
  ]);
  return { joke: modelResponse.content };
};

const graph = new StateGraph(MyState)
  .addNode("callModel", callModel)
  .addEdge(START, "callModel")
  .compile();

// The "messages" stream mode returns an iterator of tuples [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [messageChunk, metadata] of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "messages" }
)) {
  if (messageChunk.content) {
    console.log(messageChunk.content + "|");
  }
}

按 LLM 调用过滤

您可以将 tags 与 LLM 调用关联,以按 LLM 调用过滤流式传输的令牌。
import { ChatOpenAI } from "@langchain/openai";

// model1 is tagged with "joke"
const model1 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['joke']
});
// model2 is tagged with "poem"
const model2 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['poem']
});

const graph = // ... define a graph that uses these LLMs

// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the tags field in the metadata to only include
  // the tokens from the LLM invocation with the "joke" tag
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

// The jokeModel is tagged with "joke"
const jokeModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["joke"]
});
// The poemModel is tagged with "poem"
const poemModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["poem"]
});

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("callModel", (state) => {
    const topic = state.topic;
    console.log("Writing joke...");

    const jokeResponse = await jokeModel.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);

    console.log("\n\nWriting poem...");
    const poemResponse = await poemModel.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);

    return {
      joke: jokeResponse.content,
      poem: poemResponse.content
    };
  })
  .addEdge(START, "callModel")
  .compile();

// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the tags field in the metadata to only include
  // the tokens from the LLM invocation with the "joke" tag
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}

按节点过滤

要仅从特定节点流式传输令牌,请使用 stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出
// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
  inputs,
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the langgraph_node field in the metadata
  // to only include the tokens from the specified node
  if (msg.content && metadata.langgraph_node === "some_node_name") {
    // ...
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

const model = new ChatOpenAI({ model: "gpt-4o-mini" });

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("writeJoke", async (state) => {
    const topic = state.topic;
    const jokeResponse = await model.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);
    return { joke: jokeResponse.content };
  })
  .addNode("writePoem", async (state) => {
    const topic = state.topic;
    const poemResponse = await model.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);
    return { poem: poemResponse.content };
  })
  // write both the joke and the poem concurrently
  .addEdge(START, "writeJoke")
  .addEdge(START, "writePoem")
  .compile();

// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // Filter the streamed tokens by the langgraph_node field in the metadata
  // to only include the tokens from the writePoem node
  if (msg.content && metadata.langgraph_node === "writePoem") {
    console.log(msg.content + "|");
  }
}

流式自定义数据

要从 LangGraph 节点或工具内部发送自定义用户定义数据,请按照以下步骤操作
  1. 使用 LangGraphRunnableConfig 中的 writer 参数发出自定义数据。
  2. 调用 .stream() 时设置 streamMode: "custom" 以在流中获取自定义数据。您可以组合多种模式(例如 ["updates", "custom"]),但至少一个必须是 "custom"
  • 节点
  • 工具
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  query: z.string(),
  answer: z.string(),
});

const graph = new StateGraph(State)
  .addNode("node", async (state, config) => {
    // Use the writer to emit a custom key-value pair (e.g., progress update)
    config.writer({ custom_key: "Generating custom data inside node" });
    return { answer: "some data" };
  })
  .addEdge(START, "node")
  .compile();

const inputs = { query: "example" };

// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
  console.log(chunk);
}

与任何 LLM 配合使用

您可以使用 streamMode: "custom"任何 LLM API 流式传输数据 — 即使该 API 实现 LangChain 聊天模型接口。 这使您可以集成提供自己的流式传输接口的原始 LLM 客户端或外部服务,使 LangGraph 在自定义设置中具有高度灵活性。
import { LangGraphRunnableConfig } from "@langchain/langgraph";

const callArbitraryModel = async (
  state: any,
  config: LangGraphRunnableConfig
) => {
  // Example node that calls an arbitrary model and streams the output
  // Assume you have a streaming client that yields chunks
  // Generate LLM tokens using your custom streaming client
  for await (const chunk of yourCustomStreamingClient(state.topic)) {
    // Use the writer to send custom data to the stream
    config.writer({ custom_llm_chunk: chunk });
  }
  return { result: "completed" };
};

const graph = new StateGraph(State)
  .addNode("callArbitraryModel", callArbitraryModel)
  // Add other nodes and edges as needed
  .compile();

// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(
  { topic: "cats" },
  { streamMode: "custom" }
)) {
  // The chunk will contain the custom data streamed from the llm
  console.log(chunk);
}
import { StateGraph, START, MessagesZodMeta, LangGraphRunnableConfig } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";
import OpenAI from "openai";

const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";

async function* streamTokens(modelName: string, messages: any[]) {
  const response = await openaiClient.chat.completions.create({
    messages,
    model: modelName,
    stream: true,
  });

  let role: string | null = null;
  for await (const chunk of response) {
    const delta = chunk.choices[0]?.delta;

    if (delta?.role) {
      role = delta.role;
    }

    if (delta?.content) {
      yield { role, content: delta.content };
    }
  }
}

// this is our tool
const getItems = tool(
  async (input, config: LangGraphRunnableConfig) => {
    let response = "";
    for await (const msgChunk of streamTokens(
      modelName,
      [
        {
          role: "user",
          content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
        },
      ]
    )) {
      response += msgChunk.content;
      config.writer?.(msgChunk);
    }
    return response;
  },
  {
    name: "get_items",
    description: "Use this tool to list items one might find in a place you're asked about.",
    schema: z.object({
      place: z.string().describe("The place to look up items for."),
    }),
  }
);

const State = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

const graph = new StateGraph(State)
  // this is the tool-calling graph node
  .addNode("callTool", async (state) => {
    const aiMessage = state.messages.at(-1);
    const toolCall = aiMessage.tool_calls?.at(-1);

    const functionName = toolCall?.function?.name;
    if (functionName !== "get_items") {
      throw new Error(`Tool ${functionName} not supported`);
    }

    const functionArguments = toolCall?.function?.arguments;
    const args = JSON.parse(functionArguments);

    const functionResponse = await getItems.invoke(args);
    const toolMessage = {
      tool_call_id: toolCall.id,
      role: "tool",
      name: functionName,
      content: functionResponse,
    };
    return { messages: [toolMessage] };
  })
  .addEdge(START, "callTool")
  .compile();
让我们用包含工具调用的 AIMessage 来调用图
const inputs = {
  messages: [
    {
      content: null,
      role: "assistant",
      tool_calls: [
        {
          id: "1",
          function: {
            arguments: '{"place":"bedroom"}',
            name: "get_items",
          },
          type: "function",
        }
      ],
    }
  ]
};

for await (const chunk of await graph.stream(
  inputs,
  { streamMode: "custom" }
)) {
  console.log(chunk.content + "|");
}

禁用特定聊天模型的流式传输

如果您的应用程序混合了支持流式传输和不支持流式传输的模型,您可能需要明确禁用不支持流式传输的模型的流式传输。 初始化模型时设置 streaming: false
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "o1-preview",
  // Set streaming: false to disable streaming for the chat model
  streaming: false,
});

以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.