- 流式传输图状态 — 使用
updates和values模式获取状态更新/值。 - 流式传输子图输出 — 包括来自父图和任何嵌套子图的输出。
- 流式传输 LLM 令牌 — 从任何地方捕获令牌流:节点内部、子图或工具。
- 流式传输自定义数据 — 直接从工具函数发送自定义更新或进度信号。
- 使用多种流式传输模式 — 从
values(完整状态)、updates(状态增量)、messages(LLM 令牌 + 元数据)、custom(任意用户数据)或debug(详细跟踪)中选择。
支持的流模式
将以下一个或多个流模式作为列表传递给stream 方法
| 模式 | 描述 |
|---|---|
值 | 在图的每个步骤之后流式传输状态的完整值。 |
更新 | 在图的每个步骤后流式传输状态的更新。如果同一步骤中进行了多次更新(例如,运行了多个节点),则这些更新会单独流式传输。 |
自定义 | 从图节点内部流式传输自定义数据。 |
消息 | 从调用 LLM 的任何图节点流式传输 2 元组(LLM 令牌,元数据)。 |
调试 | 在图执行过程中尽可能多地流式传输信息。 |
基本用法示例
LangGraph 图公开了stream 方法以将流式输出作为迭代器生成。
复制
向 AI 提问
for await (const chunk of await graph.stream(inputs, {
streamMode: "updates",
})) {
console.log(chunk);
}
扩展示例:流式更新
扩展示例:流式更新
复制
向 AI 提问
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
// Set streamMode: "updates" to stream only the updates to the graph state after each node
// Other stream modes are also available. See supported stream modes for details
{ streamMode: "updates" }
)) {
console.log(chunk);
}
复制
向 AI 提问
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
流式传输多种模式
您可以将一个数组作为streamMode 参数传递,以同时流式传输多种模式。 流式传输的输出将是 [mode, chunk] 的元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。复制
向 AI 提问
for await (const [mode, chunk] of await graph.stream(inputs, {
streamMode: ["updates", "custom"],
})) {
console.log(chunk);
}
流式图状态
使用流模式updates 和 values 在图执行时流式传输图的状态。
updates在图的每个步骤后流式传输状态的更新。values在图的每个步骤后流式传输状态的完整值。
复制
向 AI 提问
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
- 更新
- 值
使用此功能可仅流式传输每个步骤后由节点返回的状态更新。流式输出包括节点的名称以及更新。
复制
向 AI 提问
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "updates" }
)) {
console.log(chunk);
}
流式子图输出
要在流式输出中包含来自子图的输出,您可以在父图的.stream() 方法中设置 subgraphs: true。这将同时流式传输来自父图和任何子图的输出。 输出将作为元组 [namespace, data] 流式传输,其中 namespace 是一个元组,其中包含调用子图的节点的路径,例如 ["parent_node:", "child_node:"] 。复制
向 AI 提问
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
// Set subgraphs: true to stream outputs from subgraphs
subgraphs: true,
streamMode: "updates",
}
)) {
console.log(chunk);
}
扩展示例:从子图流式传输
扩展示例:从子图流式传输
复制
向 AI 提问
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// Define subgraph
const SubgraphState = z.object({
foo: z.string(), // note that this key is shared with the parent graph state
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// Define parent graph
const ParentState = z.object({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
// Set subgraphs: true to stream outputs from subgraphs
subgraphs: true,
}
)) {
console.log(chunk);
}
复制
向 AI 提问
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
调试
使用debug 流模式在图的整个执行过程中尽可能多地流式传输信息。流式输出包括节点的名称和完整状态。
复制
向 AI 提问
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "debug" }
)) {
console.log(chunk);
}
LLM Tokens
使用messages 流式传输模式,逐令牌地从图的任何部分(包括节点、工具、子图或任务)流式传输大型语言模型 (LLM) 输出。 messages 模式的流式输出是一个元组 [message_chunk, metadata],其中:message_chunk:来自 LLM 的令牌或消息段。metadata:一个包含图节点和 LLM 调用详细信息的字典。
如果您的 LLM 不作为 LangChain 集成提供,您可以使用 custom 模式流式传输其输出。有关详细信息,请参阅与任何 LLM 配合使用。
复制
向 AI 提问
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
const MyState = z.object({
topic: z.string(),
joke: z.string().default(""),
});
const model = new ChatOpenAI({ model: "gpt-4o-mini" });
const callModel = async (state: z.infer<typeof MyState>) => {
// Call the LLM to generate a joke about a topic
// Note that message events are emitted even when the LLM is run using .invoke rather than .stream
const modelResponse = await model.invoke([
{ role: "user", content: `Generate a joke about ${state.topic}` },
]);
return { joke: modelResponse.content };
};
const graph = new StateGraph(MyState)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
// The "messages" stream mode returns an iterator of tuples [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [messageChunk, metadata] of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "messages" }
)) {
if (messageChunk.content) {
console.log(messageChunk.content + "|");
}
}
按 LLM 调用过滤
您可以将tags 与 LLM 调用关联,以按 LLM 调用过滤流式传输的令牌。
复制
向 AI 提问
import { ChatOpenAI } from "@langchain/openai";
// model1 is tagged with "joke"
const model1 = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ['joke']
});
// model2 is tagged with "poem"
const model2 = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ['poem']
});
const graph = // ... define a graph that uses these LLMs
// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the tags field in the metadata to only include
// the tokens from the LLM invocation with the "joke" tag
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
扩展示例:按标签过滤
扩展示例:按标签过滤
复制
向 AI 提问
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
// The jokeModel is tagged with "joke"
const jokeModel = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ["joke"]
});
// The poemModel is tagged with "poem"
const poemModel = new ChatOpenAI({
model: "gpt-4o-mini",
tags: ["poem"]
});
const State = z.object({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const graph = new StateGraph(State)
.addNode("callModel", (state) => {
const topic = state.topic;
console.log("Writing joke...");
const jokeResponse = await jokeModel.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
console.log("\n\nWriting poem...");
const poemResponse = await poemModel.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return {
joke: jokeResponse.content,
poem: poemResponse.content
};
})
.addEdge(START, "callModel")
.compile();
// The streamMode is set to "messages" to stream LLM tokens
// The metadata contains information about the LLM invocation, including the tags
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the tags field in the metadata to only include
// the tokens from the LLM invocation with the "joke" tag
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
按节点过滤
要仅从特定节点流式传输令牌,请使用stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出
复制
向 AI 提问
// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
inputs,
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the langgraph_node field in the metadata
// to only include the tokens from the specified node
if (msg.content && metadata.langgraph_node === "some_node_name") {
// ...
}
}
扩展示例:从特定节点流式传输 LLM 令牌
扩展示例:从特定节点流式传输 LLM 令牌
复制
向 AI 提问
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";
const model = new ChatOpenAI({ model: "gpt-4o-mini" });
const State = z.object({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const graph = new StateGraph(State)
.addNode("writeJoke", async (state) => {
const topic = state.topic;
const jokeResponse = await model.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
return { joke: jokeResponse.content };
})
.addNode("writePoem", async (state) => {
const topic = state.topic;
const poemResponse = await model.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return { poem: poemResponse.content };
})
// write both the joke and the poem concurrently
.addEdge(START, "writeJoke")
.addEdge(START, "writePoem")
.compile();
// The "messages" stream mode returns a tuple of [messageChunk, metadata]
// where messageChunk is the token streamed by the LLM and metadata is a dictionary
// with information about the graph node where the LLM was called and other information
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// Filter the streamed tokens by the langgraph_node field in the metadata
// to only include the tokens from the writePoem node
if (msg.content && metadata.langgraph_node === "writePoem") {
console.log(msg.content + "|");
}
}
流式自定义数据
要从 LangGraph 节点或工具内部发送自定义用户定义数据,请按照以下步骤操作- 使用
LangGraphRunnableConfig中的writer参数发出自定义数据。 - 调用
.stream()时设置streamMode: "custom"以在流中获取自定义数据。您可以组合多种模式(例如["updates", "custom"]),但至少一个必须是"custom"。
- 节点
- 工具
复制
向 AI 提问
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const State = z.object({
query: z.string(),
answer: z.string(),
});
const graph = new StateGraph(State)
.addNode("node", async (state, config) => {
// Use the writer to emit a custom key-value pair (e.g., progress update)
config.writer({ custom_key: "Generating custom data inside node" });
return { answer: "some data" };
})
.addEdge(START, "node")
.compile();
const inputs = { query: "example" };
// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
与任何 LLM 配合使用
您可以使用streamMode: "custom" 从任何 LLM API 流式传输数据 — 即使该 API 不实现 LangChain 聊天模型接口。 这使您可以集成提供自己的流式传输接口的原始 LLM 客户端或外部服务,使 LangGraph 在自定义设置中具有高度灵活性。复制
向 AI 提问
import { LangGraphRunnableConfig } from "@langchain/langgraph";
const callArbitraryModel = async (
state: any,
config: LangGraphRunnableConfig
) => {
// Example node that calls an arbitrary model and streams the output
// Assume you have a streaming client that yields chunks
// Generate LLM tokens using your custom streaming client
for await (const chunk of yourCustomStreamingClient(state.topic)) {
// Use the writer to send custom data to the stream
config.writer({ custom_llm_chunk: chunk });
}
return { result: "completed" };
};
const graph = new StateGraph(State)
.addNode("callArbitraryModel", callArbitraryModel)
// Add other nodes and edges as needed
.compile();
// Set streamMode: "custom" to receive the custom data in the stream
for await (const chunk of await graph.stream(
{ topic: "cats" },
{ streamMode: "custom" }
)) {
// The chunk will contain the custom data streamed from the llm
console.log(chunk);
}
扩展示例:流式传输任意聊天模型
扩展示例:流式传输任意聊天模型
复制
向 AI 提问
import { StateGraph, START, MessagesZodMeta, LangGraphRunnableConfig } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";
import OpenAI from "openai";
const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";
async function* streamTokens(modelName: string, messages: any[]) {
const response = await openaiClient.chat.completions.create({
messages,
model: modelName,
stream: true,
});
let role: string | null = null;
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta?.role) {
role = delta.role;
}
if (delta?.content) {
yield { role, content: delta.content };
}
}
}
// this is our tool
const getItems = tool(
async (input, config: LangGraphRunnableConfig) => {
let response = "";
for await (const msgChunk of streamTokens(
modelName,
[
{
role: "user",
content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
},
]
)) {
response += msgChunk.content;
config.writer?.(msgChunk);
}
return response;
},
{
name: "get_items",
description: "Use this tool to list items one might find in a place you're asked about.",
schema: z.object({
place: z.string().describe("The place to look up items for."),
}),
}
);
const State = z.object({
messages: z
.array(z.custom<BaseMessage>())
.register(registry, MessagesZodMeta),
});
const graph = new StateGraph(State)
// this is the tool-calling graph node
.addNode("callTool", async (state) => {
const aiMessage = state.messages.at(-1);
const toolCall = aiMessage.tool_calls?.at(-1);
const functionName = toolCall?.function?.name;
if (functionName !== "get_items") {
throw new Error(`Tool ${functionName} not supported`);
}
const functionArguments = toolCall?.function?.arguments;
const args = JSON.parse(functionArguments);
const functionResponse = await getItems.invoke(args);
const toolMessage = {
tool_call_id: toolCall.id,
role: "tool",
name: functionName,
content: functionResponse,
};
return { messages: [toolMessage] };
})
.addEdge(START, "callTool")
.compile();
AIMessage 来调用图复制
向 AI 提问
const inputs = {
messages: [
{
content: null,
role: "assistant",
tool_calls: [
{
id: "1",
function: {
arguments: '{"place":"bedroom"}',
name: "get_items",
},
type: "function",
}
],
}
]
};
for await (const chunk of await graph.stream(
inputs,
{ streamMode: "custom" }
)) {
console.log(chunk.content + "|");
}
禁用特定聊天模型的流式传输
如果您的应用程序混合了支持流式传输和不支持流式传输的模型,您可能需要明确禁用不支持流式传输的模型的流式传输。 初始化模型时设置streaming: false。复制
向 AI 提问
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "o1-preview",
// Set streaming: false to disable streaming for the chat model
streaming: false,
});
以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。