for chunk in graph.stream( {"topic": "ice cream"}, stream_mode=["updates", "custom"], version="v2",): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node {node_name} updated: {state}") elif chunk["type"] == "custom": print(f"Status: {chunk['data']['status']}")
输出
Status: thinking of a joke...Node generate_joke updated: {'joke': 'Why did the ice cream go to school? To get a sundae education!'}
完整示例
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.config import get_stream_writerclass State(TypedDict): topic: str joke: strdef generate_joke(state: State): writer = get_stream_writer() writer({"status": "thinking of a joke..."}) return {"joke": f"Why did the {state['topic']} go to school? To get a sundae education!"}graph = ( StateGraph(State) .add_node(generate_joke) .add_edge(START, "generate_joke") .add_edge("generate_joke", END) .compile())for chunk in graph.stream( {"topic": "ice cream"}, stream_mode=["updates", "custom"], version="v2",): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node {node_name} updated: {state}") elif chunk["type"] == "custom": print(f"Status: {chunk['data']['status']}")
输出
Status: thinking of a joke...Node generate_joke updated: {'joke': 'Why did the ice cream go to school? To get a sundae education!'}
for part in graph.stream( {"topic": "ice cream"}, stream_mode=["values", "updates", "messages", "custom"], version="v2",): if part["type"] == "values": # ValuesStreamPart — full state snapshot after each step print(f"State: topic={part['data']['topic']}") elif part["type"] == "updates": # UpdatesStreamPart — only the changed keys from each node for node_name, state in part["data"].items(): print(f"Node `{node_name}` updated: {state}") elif part["type"] == "messages": # MessagesStreamPart — (message_chunk, metadata) from LLM calls msg, metadata = part["data"] print(msg.content, end="", flush=True) elif part["type"] == "custom": # CustomStreamPart — arbitrary data from get_stream_writer() print(f"Progress: {part['data']['progress']}%")
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): topic: str joke: strdef refine_topic(state: State): return {"topic": state["topic"] + " and cats"}def generate_joke(state: State): return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(refine_topic) .add_node(generate_joke) .add_edge(START, "refine_topic") .add_edge("refine_topic", "generate_joke") .add_edge("generate_joke", END) .compile())
更新
值
使用此功能可仅流式传输每个步骤后由节点返回的状态更新。流式输出包括节点的名称以及更新。
for chunk in graph.stream( {"topic": "ice cream"}, stream_mode="updates", version="v2",): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node `{node_name}` updated: {state}")
输出
Node `refine_topic` updated: {'topic': 'ice cream and cats'}Node `generate_joke` updated: {'joke': 'This is a joke about ice cream and cats'}
使用此方法在每个步骤之后流式传输图的完整状态。
for chunk in graph.stream( {"topic": "ice cream"}, stream_mode="values", version="v2",): if chunk["type"] == "values": print(f"topic: {chunk['data']['topic']}, joke: {chunk['data']['joke']}")
输出
topic: ice cream, joke:topic: ice cream and cats, joke:topic: ice cream and cats, joke: This is a joke about ice cream and cats
from dataclasses import dataclassfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import StateGraph, START@dataclassclass MyState: topic: str joke: str = ""model = init_chat_model(model="gpt-5.4-mini")def call_model(state: MyState): """Call the LLM to generate a joke about a topic""" # Note that message events are emitted even when the LLM is run using .invoke rather than .stream model_response = model.invoke( [ {"role": "user", "content": f"Generate a joke about {state.topic}"} ] ) return {"joke": model_response.content}graph = ( StateGraph(MyState) .add_node(call_model) .add_edge(START, "call_model") .compile())# The "messages" stream mode streams LLM tokens with metadata# Use version="v2" for a unified StreamPart formatfor chunk in graph.stream( {"topic": "ice cream"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": message_chunk, metadata = chunk["data"] if message_chunk.content: print(message_chunk.content, end="|", flush=True)
from langchain.chat_models import init_chat_model# model_1 is tagged with "joke"model_1 = init_chat_model(model="gpt-5.4-mini", tags=['joke'])# model_2 is tagged with "poem"model_2 = init_chat_model(model="gpt-5.4-mini", tags=['poem'])graph = ... # define a graph that uses these LLMs# The stream_mode is set to "messages" to stream LLM tokens# The metadata contains information about the LLM invocation, including the tagsasync for chunk in graph.astream( {"topic": "cats"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] # Filter the streamed tokens by the tags field in the metadata to only include # the tokens from the LLM invocation with the "joke" tag if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True)
扩展示例:按标签筛选
from typing import TypedDictfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import START, StateGraph# The joke_model is tagged with "joke"joke_model = init_chat_model(model="gpt-5.4-mini", tags=["joke"])# The poem_model is tagged with "poem"poem_model = init_chat_model(model="gpt-5.4-mini", tags=["poem"])class State(TypedDict): topic: str joke: str poem: strasync def call_model(state, config): topic = state["topic"] print("Writing joke...") # Note: Passing the config through explicitly is required for python < 3.11 # Since context var support wasn't added before then: https://docs.pythonlang.cn/3/library/asyncio-task.html#creating-tasks # The config is passed through explicitly to ensure the context vars are propagated correctly # This is required for Python < 3.11 when using async code. Please see the async section for more details joke_response = await joke_model.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], config, ) print("\n\nWriting poem...") poem_response = await poem_model.ainvoke( [{"role": "user", "content": f"Write a short poem about {topic}"}], config, ) return {"joke": joke_response.content, "poem": poem_response.content}graph = ( StateGraph(State) .add_node(call_model) .add_edge(START, "call_model") .compile())# The stream_mode is set to "messages" to stream LLM tokens# The metadata contains information about the LLM invocation, including the tagsasync for chunk in graph.astream( {"topic": "cats"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True)
# The "messages" stream mode streams LLM tokens with metadata# Use version="v2" for a unified StreamPart formatfor chunk in graph.stream( inputs, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] # Filter the streamed tokens by the langgraph_node field in the metadata # to only include the tokens from the specified node if msg.content and metadata["langgraph_node"] == "some_node_name": ...
扩展示例:从特定节点流式传输 LLM 令牌
from typing import TypedDictfrom langgraph.graph import START, StateGraphfrom langchain_openai import ChatOpenAImodel = ChatOpenAI(model="gpt-5.4-mini")class State(TypedDict): topic: str joke: str poem: strdef write_joke(state: State): topic = state["topic"] joke_response = model.invoke( [{"role": "user", "content": f"Write a joke about {topic}"}] ) return {"joke": joke_response.content}def write_poem(state: State): topic = state["topic"] poem_response = model.invoke( [{"role": "user", "content": f"Write a short poem about {topic}"}] ) return {"poem": poem_response.content}graph = ( StateGraph(State) .add_node(write_joke) .add_node(write_poem) # write both the joke and the poem concurrently .add_edge(START, "write_joke") .add_edge(START, "write_poem") .compile())# The "messages" stream mode streams LLM tokens with metadata# Use version="v2" for a unified StreamPart formatfor chunk in graph.stream( {"topic": "cats"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": msg, metadata = chunk["data"] # Filter the streamed tokens by the langgraph_node field in the metadata # to only include the tokens from the write_poem node if msg.content and metadata["langgraph_node"] == "write_poem": print(msg.content, end="|", flush=True)
from typing import TypedDictfrom langgraph.config import get_stream_writerfrom langgraph.graph import StateGraph, STARTclass State(TypedDict): query: str answer: strdef node(state: State): # Get the stream writer to send custom data writer = get_stream_writer() # Emit a custom key-value pair (e.g., progress update) writer({"custom_key": "Generating custom data inside node"}) return {"answer": "some data"}graph = ( StateGraph(State) .add_node(node) .add_edge(START, "node") .compile())inputs = {"query": "example"}# Set stream_mode="custom" to receive the custom data in the streamfor chunk in graph.stream(inputs, stream_mode="custom", version="v2"): if chunk["type"] == "custom": print(f"Custom event: {chunk['data']['custom_key']}")
from langchain.tools import toolfrom langgraph.config import get_stream_writer@tooldef query_database(query: str) -> str: """Query the database.""" # Access the stream writer to send custom data writer = get_stream_writer() # Emit a custom key-value pair (e.g., progress update) writer({"data": "Retrieved 0/100 records", "type": "progress"}) # perform query # Emit another custom key-value pair writer({"data": "Retrieved 100/100 records", "type": "progress"}) return "some-answer"graph = ... # define a graph that uses this tool# Set stream_mode="custom" to receive the custom data in the streamfor chunk in graph.stream(inputs, stream_mode="custom", version="v2"): if chunk["type"] == "custom": print(f"{chunk['data']['type']}: {chunk['data']['data']}")
for chunk in graph.stream(inputs, stream_mode=["updates", "custom"], version="v2"): if chunk["type"] == "updates": for node_name, state in chunk["data"].items(): print(f"Node `{node_name}` updated: {state}") elif chunk["type"] == "custom": print(f"Custom event: {chunk['data']}")
您可以使用 stream_mode="custom" 从任何 LLM API 流式传输数据——即使该 API 不实现 LangChain 聊天模型接口。这使您可以集成提供自己的流式接口的原始 LLM 客户端或外部服务,使 LangGraph 在自定义设置方面具有高度灵活性。
from langgraph.config import get_stream_writerdef call_arbitrary_model(state): """Example node that calls an arbitrary model and streams the output""" # Get the stream writer to send custom data writer = get_stream_writer() # Assume you have a streaming client that yields chunks # Generate LLM tokens using your custom streaming client for chunk in your_custom_streaming_client(state["topic"]): # Use the writer to send custom data to the stream writer({"custom_llm_chunk": chunk}) return {"result": "completed"}graph = ( StateGraph(State) .add_node(call_arbitrary_model) # Add other nodes and edges as needed .compile())# Set stream_mode="custom" to receive the custom data in the streamfor chunk in graph.stream( {"topic": "cats"}, stream_mode="custom", version="v2",): if chunk["type"] == "custom": # The chunk data will contain the custom data streamed from the llm print(chunk["data"])
扩展示例:流式传输任意聊天模型
import operatorimport jsonfrom typing import TypedDictfrom typing_extensions import Annotatedfrom langgraph.graph import StateGraph, STARTfrom openai import AsyncOpenAIopenai_client = AsyncOpenAI()model_name = "gpt-5.4-mini"async def stream_tokens(model_name: str, messages: list[dict]): response = await openai_client.chat.completions.create( messages=messages, model=model_name, stream=True ) role = None async for chunk in response: delta = chunk.choices[0].delta if delta.role is not None: role = delta.role if delta.content: yield {"role": role, "content": delta.content}# this is our toolasync def get_items(place: str) -> str: """Use this tool to list items one might find in a place you're asked about.""" writer = get_stream_writer() response = "" async for msg_chunk in stream_tokens( model_name, [ { "role": "user", "content": ( "Can you tell me what kind of items " f"i might find in the following place: '{place}'. " "List at least 3 such items separating them by a comma. " "And include a brief description of each item." ), } ], ): response += msg_chunk["content"] writer(msg_chunk) return responseclass State(TypedDict): messages: Annotated[list[dict], operator.add]# this is the tool-calling graph nodeasync def call_tool(state: State): ai_message = state["messages"][-1] tool_call = ai_message["tool_calls"][-1] function_name = tool_call["function"]["name"] if function_name != "get_items": raise ValueError(f"Tool {function_name} not supported") function_arguments = tool_call["function"]["arguments"] arguments = json.loads(function_arguments) function_response = await get_items(**arguments) tool_message = { "tool_call_id": tool_call["id"], "role": "tool", "name": function_name, "content": function_response, } return {"messages": [tool_message]}graph = ( StateGraph(State) .add_node(call_tool) .add_edge(START, "call_tool") .compile())
from langchain.chat_models import init_chat_modelmodel = init_chat_model( "claude-sonnet-4-6", # Set streaming=False to disable streaming for the chat model streaming=False)
from langchain_openai import ChatOpenAI# Set streaming=False to disable streaming for the chat modelmodel = ChatOpenAI(model="o1-preview", streaming=False)
from typing import TypedDictfrom langgraph.graph import START, StateGraphfrom langchain.chat_models import init_chat_modelmodel = init_chat_model(model="gpt-5.4-mini")class State(TypedDict): topic: str joke: str# Accept config as an argument in the async node functionasync def call_model(state, config): topic = state["topic"] print("Generating joke...") # Pass config to model.ainvoke() to ensure proper context propagation joke_response = await model.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], config, ) return {"joke": joke_response.content}graph = ( StateGraph(State) .add_node(call_model) .add_edge(START, "call_model") .compile())# Set stream_mode="messages" to stream LLM tokensasync for chunk in graph.astream( {"topic": "ice cream"}, stream_mode="messages", version="v2",): if chunk["type"] == "messages": message_chunk, metadata = chunk["data"] if message_chunk.content: print(message_chunk.content, end="|", flush=True)
扩展示例:带有流写入器的异步自定义流式传输
from typing import TypedDictfrom langgraph.types import StreamWriterclass State(TypedDict): topic: str joke: str# Add writer as an argument in the function signature of the async node or tool# LangGraph will automatically pass the stream writer to the functionasync def generate_joke(state: State, writer: StreamWriter): writer({"custom_key": "Streaming custom data while generating a joke"}) return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(generate_joke) .add_edge(START, "generate_joke") .compile())# Set stream_mode="custom" to receive the custom data in the stream #async for chunk in graph.astream( {"topic": "ice cream"}, stream_mode="custom", version="v2",): if chunk["type"] == "custom": print(chunk["data"])