Deep Agents 建立在 LangGraph 的流式基础设施之上,并提供对子代理流的一等支持。当 Deep Agent 将工作委托给子代理时,您可以独立流式传输来自每个子代理的更新——实时追踪进度、LLM token 和工具调用。 Deep Agent 流式传输的功能:文档索引
在以下地址获取完整的文档索引:https://docs.langchain.org.cn/llms.txt
在进一步探索之前,请使用此文件发现所有可用页面。
- 流式传输子代理进度——追踪每个子代理并行执行时的进度。
- 流式传输 LLM token——从主代理和每个子代理流式传输 token。
- 流式传输工具调用——查看子代理执行过程中的工具调用及其结果。
- 流式传输自定义更新——从子代理节点内部发出用户定义的信号。
启用子图流式传输
Deep Agents 使用 LangGraph 的子图流式传输来展示子代理执行过程中的事件。要接收子代理事件,请在流式传输时启用stream_subgraphs。
from deepagents import create_deep_agent
agent = create_deep_agent(
model="google_genai:gemini-3.1-pro-preview",
system_prompt="You are a helpful research assistant",
subagents=[
{
"name": "researcher",
"description": "Researches a topic in depth",
"system_prompt": "You are a thorough researcher.",
},
],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Research quantum computing advances"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
if chunk["ns"]:
# Subagent event - namespace identifies the source
print(f"[subagent: {chunk['ns']}]")
else:
# Main agent event
print("[main agent]")
print(chunk["data"])
命名空间 (Namespaces)
当启用subgraphs 时,每个流式事件都会包含一个命名空间,用于标识是哪个代理产生了该事件。命名空间是一条由节点名称和任务 ID 组成的路径,代表了代理的层级结构。
| 命名空间 | 来源 |
|---|---|
() (空) | 主智能体 |
("tools:abc123",) | 由主代理的 task 工具调用 abc123 所生成的子代理 |
("tools:abc123", "model_request:def456") | 子代理内部的模型请求节点 |
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Plan my vacation"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
# Check if this event came from a subagent
is_subagent = any(
segment.startswith("tools:") for segment in chunk["ns"]
)
if is_subagent:
# Extract the tool call ID from the namespace
tool_call_id = next(
s.split(":")[1] for s in chunk["ns"] if s.startswith("tools:")
)
print(f"Subagent {tool_call_id}: {chunk['data']}")
else:
print(f"Main agent: {chunk['data']}")
子代理进度
使用stream_mode="updates" 在每个步骤完成时追踪子代理进度。这对于显示哪些子代理处于活动状态以及它们已完成的工作非常有用。
from deepagents import create_deep_agent
agent = create_deep_agent(
model="google_genai:gemini-3.1-pro-preview",
system_prompt=(
"You are a project coordinator. Always delegate research tasks "
"to your researcher subagent using the task tool. Keep your final response to one sentence."
),
subagents=[
{
"name": "researcher",
"description": "Researches topics thoroughly",
"system_prompt": (
"You are a thorough researcher. Research the given topic "
"and provide a concise summary in 2-3 sentences."
),
},
],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Write a short summary about AI safety"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
# Main agent updates (empty namespace)
if not chunk["ns"]:
for node_name, data in chunk["data"].items():
if node_name == "tools":
# Subagent results returned to main agent
for msg in data.get("messages", []):
if msg.type == "tool":
print(f"\nSubagent complete: {msg.name}")
print(f" Result: {str(msg.content)[:200]}...")
else:
print(f"[main agent] step: {node_name}")
# Subagent updates (non-empty namespace)
else:
for node_name, data in chunk["data"].items():
print(f" [{chunk['ns'][0]}] step: {node_name}")
输出
[main agent] step: model_request
[tools:call_abc123] step: model_request
[tools:call_abc123] step: tools
[tools:call_abc123] step: model_request
Subagent complete: task
Result: ## AI Safety Report...
[main agent] step: model_request
LLM Tokens
使用stream_mode="messages" 从主代理和子代理流式传输单个 token。每个消息事件都包含标识源代理的元数据。
current_source = ""
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Research quantum computing advances"}]},
stream_mode="messages",
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# Check if this event came from a subagent (namespace contains "tools:")
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
if is_subagent:
# Token from a subagent
subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
if subagent_ns != current_source:
print(f"\n\n--- [subagent: {subagent_ns}] ---")
current_source = subagent_ns
if token.content:
print(token.content, end="", flush=True)
else:
# Token from the main agent
if "main" != current_source:
print("\n\n--- [main agent] ---")
current_source = "main"
if token.content:
print(token.content, end="", flush=True)
print()
工具调用
当子代理使用工具时,您可以流式传输工具调用事件以显示每个子代理的操作。工具调用数据块会出现在messages 流模式中。
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Research recent quantum computing advances"}]},
stream_mode="messages",
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# Identify source: "main" or the subagent namespace segment
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
source = next((s for s in chunk["ns"] if s.startswith("tools:")), "main") if is_subagent else "main"
# Tool call chunks (streaming tool invocations)
if token.tool_call_chunks:
for tc in token.tool_call_chunks:
if tc.get("name"):
print(f"\n[{source}] Tool call: {tc['name']}")
# Args stream in chunks - write them incrementally
if tc.get("args"):
print(tc["args"], end="", flush=True)
# Tool results
if token.type == "tool":
print(f"\n[{source}] Tool result [{token.name}]: {str(token.content)[:150]}")
# Regular AI content (skip tool call messages)
if token.type == "ai" and token.content and not token.tool_call_chunks:
print(token.content, end="", flush=True)
print()
自定义更新
在您的子代理工具中使用get_stream_writer 来发出自定义进度事件
import time
from langchain.tools import tool
from langgraph.config import get_stream_writer
from deepagents import create_deep_agent
@tool
def analyze_data(topic: str) -> str:
"""Run a data analysis on a given topic.
This tool performs the actual analysis and emits progress updates.
You MUST call this tool for any analysis request.
"""
writer = get_stream_writer()
writer({"status": "starting", "topic": topic, "progress": 0})
time.sleep(0.5)
writer({"status": "analyzing", "progress": 50})
time.sleep(0.5)
writer({"status": "complete", "progress": 100})
return (
f'Analysis of "{topic}": Customer sentiment is 85% positive, '
"driven by product quality and support response times."
)
agent = create_deep_agent(
model="google_genai:gemini-3.1-pro-preview",
system_prompt=(
"You are a coordinator. For any analysis request, you MUST delegate "
"to the analyst subagent using the task tool. Never try to answer directly. "
"After receiving the result, summarize it in one sentence."
),
subagents=[
{
"name": "analyst",
"description": "Performs data analysis with real-time progress tracking",
"system_prompt": (
"You are a data analyst. You MUST call the analyze_data tool "
"for every analysis request. Do not use any other tools. "
"After the analysis completes, report the result."
),
"tools": [analyze_data],
},
],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Analyze customer satisfaction trends"}]},
stream_mode="custom",
subgraphs=True,
version="v2",
):
if chunk["type"] == "custom":
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
if is_subagent:
subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
print(f"[{subagent_ns}]", chunk["data"])
else:
print("[main]", chunk["data"])
输出
[tools:call_abc123] {'status': 'starting', 'topic': 'customer satisfaction trends', 'progress': 0}
[tools:call_abc123] {'status': 'analyzing', 'progress': 50}
[tools:call_abc123] {'status': 'complete', 'progress': 100}
流式传输多种模式
结合多种流模式以获得代理执行的完整图景# Skip internal middleware steps - only show meaningful node names
INTERESTING_NODES = {"model_request", "tools"}
last_source = ""
mid_line = False # True when we've written tokens without a trailing newline
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Analyze the impact of remote work on team productivity"}]},
stream_mode=["updates", "messages", "custom"],
subgraphs=True,
version="v2",
):
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
source = "subagent" if is_subagent else "main"
if chunk["type"] == "updates":
for node_name in chunk["data"]:
if node_name not in INTERESTING_NODES:
continue
if mid_line:
print()
mid_line = False
print(f"[{source}] step: {node_name}")
elif chunk["type"] == "messages":
token, metadata = chunk["data"]
if token.content:
# Print a header when the source changes
if source != last_source:
if mid_line:
print()
mid_line = False
print(f"\n[{source}] ", end="")
last_source = source
print(token.content, end="", flush=True)
mid_line = True
elif chunk["type"] == "custom":
if mid_line:
print()
mid_line = False
print(f"[{source}] custom event:", chunk["data"])
print()
常见模式
追踪子代理生命周期
监控子代理何时启动、运行和完成active_subagents = {}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Research the latest AI safety developments"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
for node_name, data in chunk["data"].items():
# ─── Phase 1: Detect subagent starting ────────────────────────
# When the main agent's model_request contains task tool calls,
# a subagent has been spawned.
if not chunk["ns"] and node_name == "model_request":
for msg in data.get("messages", []):
for tc in getattr(msg, "tool_calls", []):
if tc["name"] == "task":
active_subagents[tc["id"]] = {
"type": tc["args"].get("subagent_type"),
"description": tc["args"].get("description", "")[:80],
"status": "pending",
}
print(
f'[lifecycle] PENDING → subagent "{tc["args"].get("subagent_type")}" '
f'({tc["id"]})'
)
# ─── Phase 2: Detect subagent running ─────────────────────────
# When we receive events from a tools:UUID namespace, that
# subagent is actively executing.
if chunk["ns"] and chunk["ns"][0].startswith("tools:"):
pregel_id = chunk["ns"][0].split(":")[1]
# Check if any pending subagent needs to be marked running.
# Note: the pregel task ID differs from the tool_call_id,
# so we mark any pending subagent as running on first subagent event.
for sub_id, sub in active_subagents.items():
if sub["status"] == "pending":
sub["status"] = "running"
print(
f'[lifecycle] RUNNING → subagent "{sub["type"]}" '
f"(pregel: {pregel_id})"
)
break
# ─── Phase 3: Detect subagent completing ──────────────────────
# When the main agent's tools node returns a tool message,
# the subagent has completed and returned its result.
if not chunk["ns"] and node_name == "tools":
for msg in data.get("messages", []):
if msg.type == "tool":
sub = active_subagents.get(msg.tool_call_id)
if sub:
sub["status"] = "complete"
print(
f'[lifecycle] COMPLETE → subagent "{sub["type"]}" '
f"({msg.tool_call_id})"
)
print(f" Result preview: {str(msg.content)[:120]}...")
# Print final state
print("\n--- Final subagent states ---")
for sub_id, sub in active_subagents.items():
print(f" {sub['type']}: {sub['status']}")
v2 流式格式
需要 LangGraph >= 1.1。
version="v2"),这是推荐的方法。每个数据块都是一个包含 type、ns 和 data 键的 StreamPart 字典——无论流模式、模式数量或子图设置如何,其结构均保持一致。 v2 格式消除了嵌套元组解包的需求,使得在 Deep Agents 中处理子图流式传输变得简单直接。两种格式对比:# Unified format — no nested tuple unpacking
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Research quantum computing"}]},
stream_mode=["updates", "messages", "custom"],
subgraphs=True,
version="v2",
):
print(chunk["type"]) # "updates", "messages", or "custom"
print(chunk["ns"]) # () for main agent, ("tools:<id>",) for subagent
print(chunk["data"]) # payload
相关
- 子代理 (Subagents)—配置并在 Deep Agents 中使用子代理
- 前端流式传输—为 Deep Agents 构建使用
useStream的 React UI - LangChain 流式传输概览—使用 LangChain 代理的通用流式概念
将这些文档连接到 Claude、VSCode 等,以获得实时答案。

