跳到主要内容

文档索引

在以下地址获取完整的文档索引:https://docs.langchain.org.cn/llms.txt

在进一步探索之前,请使用此文件发现所有可用页面。

当节点失败时——无论是由于外部 API 缓慢、瞬时网络错误还是未处理的异常——LangGraph 为您提供了三种可组合的响应机制:
  • 重试 — 根据异常类型和退避设置自动重新运行失败的尝试
  • 超时 — 限制单次尝试的运行时间
  • 错误处理 — 在所有重试都耗尽后运行恢复函数
这些按固定顺序组合:当节点尝试引发任何异常(包括因超时产生的 NodeTimeoutError)时,重试策略决定是否重试。只有在重试耗尽后,错误处理程序才会运行。 要实现在超步边界干净地停止运行并稍后恢复,请参阅优雅关机
每个节点的超时和节点级错误处理程序需要 langgraph>=1.2,目前处于 alpha 阶段。

重试

重试策略根据异常类型和退避设置自动重新运行失败的节点尝试。将 retry_policy= 传递给 add_node
from langgraph.types import RetryPolicy

builder.add_node(
    "call_api",
    call_api,
    retry_policy=RetryPolicy(max_attempts=3),
)

默认行为

默认情况下,retry_on 使用 default_retry_on,它会重试除了以下异常(及其子类)之外的任何异常:
  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError
对于来自 requestshttpx 等流行 HTTP 库的异常,它仅在 5xx 状态码时重试。NodeTimeoutError 默认是可重试的。

参数

参数类型默认描述
max_attemptsint3最大尝试次数,包括第一次。
initial_interval浮点数0.5第一次重试前的秒数。
backoff_factor浮点数2.0每次重试后应用于间隔的乘数。
max_interval浮点数128.0重试之间的最大秒数。
抖动 (jitter)boolTrue向间隔添加随机抖动。
retry_ontype[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]default_retry_on要重试的异常,或一个为可重试异常返回 True 的可调用对象。

自定义重试逻辑

将可调用对象或异常类型传递给 retry_on。导入 default_retry_on 以扩展默认行为
from langgraph.types import RetryPolicy, default_retry_on

def custom_retry_on(exc: BaseException) -> bool:
    if isinstance(exc, MyCustomError):
        return False
    return default_retry_on(exc)

builder.add_node(
    "call_api",
    call_api,
    retry_policy=RetryPolicy(max_attempts=3, retry_on=custom_retry_on),
)

检查重试状态

在节点内部使用 runtime.execution_info 来检查当前的尝试次数。当主要调用持续失败时,这对于切换到备用方案很有用。
from langgraph.graph import StateGraph, START, END
from langgraph.runtime import Runtime
from langgraph.types import RetryPolicy
from typing_extensions import TypedDict

class State(TypedDict):
    result: str

def my_node(state: State, runtime: Runtime) -> State:
    if runtime.execution_info.node_attempt > 1:
        return {"result": call_fallback_api()}
    return {"result": call_primary_api()}

builder = StateGraph(State)
builder.add_node("my_node", my_node, retry_policy=RetryPolicy(max_attempts=3))
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
execution_info 暴露以下字段
属性类型描述
node_attemptint当前尝试次数(1-indexed)。第一次尝试为 1,第一次重试为 2,依此类推。
node_first_attempt_timefloat | None第一次尝试开始时的 Unix 时间戳。在所有重试中保持不变。
thread_idstr | None当前执行的线程 ID。如果没有检查点器,则为 None
run_idstr | None当前执行的运行 ID。如果未在配置中提供,则为 None
checkpoint_idstr当前执行的检查点 ID。
task_idstr当前执行的任务 ID。
即使没有重试策略,execution_info 也可用——node_attempt 默认为 1

超时

需要 langgraph>=1.2,目前处于 alpha 阶段。
add_node 上的 timeout= 参数限制了单个节点尝试的运行时间。可以传递一个数字(秒)、一个 timedelta,或者一个 TimeoutPolicy 以分别设置运行和空闲限制。
from datetime import timedelta
from langgraph.types import TimeoutPolicy

# Simple wall-clock cap
builder.add_node("call_model", call_model, timeout=60)
builder.add_node("call_model", call_model, timeout=timedelta(minutes=2))

# Separate run and idle limits
builder.add_node(
    "call_model",
    call_model,
    timeout=TimeoutPolicy(run_timeout=120, idle_timeout=30),
)
节点超时仅适用于异步节点。带有 timeout 的同步节点会在编译时被拒绝。要封装阻塞 I/O,请在异步节点内部使用 asyncio.to_thread

运行超时

run_timeout 是对单次尝试的硬性时钟限制。它从不刷新,无论节点活动如何。
from langgraph.types import TimeoutPolicy

builder.add_node(
    "call_model",
    call_model,
    timeout=TimeoutPolicy(run_timeout=120),
)
当超出限制时,LangGraph 会引发 NodeTimeoutError,清除失败尝试的所有写入,并让重试策略决定是否重试。

空闲超时

idle_timeout 是一个进度重置上限。它仅在节点在指定持续时间内停止产生可观察的进度时触发——与 run_timeout 不同,每当节点产生进度信号时,计时器就会重置。
builder.add_node(
    "call_model",
    call_model,
    timeout=TimeoutPolicy(idle_timeout=30),
)
您可以同时设置 run_timeoutidle_timeout。两者中任何一个首先触发都会取消尝试。

进度信号

在默认的 refresh_on="auto" 下,空闲计时器会在以下任何情况下重置:
  • 通过 CONFIG_KEY_SEND 进行状态写入
  • 流输出(生成的异步流块)
  • 子任务调度
  • 运行时流写入器调用
  • 来自节点或其后代的任何 LangChain 回调事件(LLM 令牌、工具调用、链开始/结束等)

心跳模式

refresh_on="heartbeat" 设置为仅将刷新源限制为显式的 runtime.heartbeat() 调用。当您希望严格定义空闲状态,且不被冗长的子任务重置时,这会很有用。
builder.add_node(
    "call_model",
    call_model,
    timeout=TimeoutPolicy(idle_timeout=30, refresh_on="heartbeat"),
)

手动心跳

对于不会自然发出进度信号的长时间运行的异步工作,调用 runtime.heartbeat() 可以手动重置空闲计时器。
from langgraph.graph import StateGraph, START, END
from langgraph.runtime import Runtime
from langgraph.types import TimeoutPolicy
from typing_extensions import TypedDict

class State(TypedDict):
    result: str

async def long_running_node(state: State, runtime: Runtime) -> State:
    for batch in fetch_batches():
        process(batch)
        runtime.heartbeat()
    return {"result": "done"}

builder = StateGraph(State)
builder.add_node(
    "long_running_node",
    long_running_node,
    timeout=TimeoutPolicy(idle_timeout=30, refresh_on="heartbeat"),
)
builder.add_edge(START, "long_running_node")
builder.add_edge("long_running_node", END)
在空闲计时尝试之外,runtime.heartbeat() 是一个无操作,因此您可以无条件调用它。

NodeTimeoutError

当超时触发时,LangGraph 会引发 NodeTimeoutError,并附带关于哪个限制被触发的结构化上下文。
属性类型描述
nodestr执行超时的节点名称。
elapsed浮点数超时触发前经过的秒数。
kindLiteral["idle", "run"]哪个超时被触发。
idle_timeoutfloat | None配置的空闲超时时间(秒),如果有的话。
run_timeoutfloat | None配置的运行超时时间(秒),如果有的话。
NodeTimeoutError 默认是可重试的。将 timeout=retry_policy= 结合使用可以开箱即用——超时计时器会在每次新尝试时重置,并且在下一次重试之前会清除超时尝试的写入。
from langgraph.types import RetryPolicy, TimeoutPolicy

builder.add_node(
    "call_model",
    call_model,
    timeout=TimeoutPolicy(idle_timeout=30),
    retry_policy=RetryPolicy(max_attempts=3),
)

使用 Send 实现动态超时

当使用 Send 动态分派节点时(例如,在 map-reduce 模式中),您可以直接在 Send 上传递 timeout= 来覆盖目标节点在该特定推送的静态超时。
from langgraph.types import Send, TimeoutPolicy

def fan_out(state: OverallState):
    return [
        Send("process_item", {"item": item}, timeout=TimeoutPolicy(idle_timeout=15))
        for item in state["items"]
    ]
如果在 Send 上省略 timeout=,则应用目标节点的超时(在 add_node 时设置)。这允许您在节点上设置默认超时,并针对单个调用进行收紧。

错误处理

需要 langgraph>=1.2,目前处于 alpha 阶段。
错误处理程序在节点失败且所有重试都耗尽后运行。它接收当前状态并可以更新状态或使用 Command 路由到不同的节点。这对于补偿流(Saga 模式)很有用,在这种情况下,您希望优雅地恢复而不是中止整个图。 error_handler= 传递给 add_node
from langgraph.errors import NodeError
from langgraph.types import Command, RetryPolicy
from langgraph.graph import StateGraph, START
from typing_extensions import TypedDict

class State(TypedDict):
    status: str

def charge_payment(state: State) -> State:
    raise RuntimeError("payment gateway timeout")

def payment_error_handler(state: State, error: NodeError) -> Command:
    return Command(
        update={"status": f"compensated: {error.error}"},
        goto="finalize",
    )

def finalize(state: State) -> State:
    return state

graph = (
    StateGraph(State)
    .add_node(
        "charge_payment",
        charge_payment,
        retry_policy=RetryPolicy(max_attempts=3, retry_on=ConnectionError),
        error_handler=payment_error_handler,
    )
    .add_node("finalize", finalize)
    .add_edge(START, "charge_payment")
    .compile()
)
处理程序仅在 retry_policy 耗尽后触发,或者如果未配置重试策略则立即触发。重试策略和错误处理程序保持解耦:独立配置何时重试和何时补偿。

NodeError

错误处理程序通过类型标注注入的类型化 error: NodeError 参数接收故障上下文(与 runtime: Runtime 模式相同)。
from langgraph.errors import NodeError

def my_handler(state: State, error: NodeError) -> Command:
    print(f"Node {error.node} failed with: {error.error}")
    return Command(update={"status": "recovered"}, goto="next_step")
NodeError 是一个带有两个字段的冻结数据类
属性类型描述
nodestr执行失败的节点名称。
错误BaseException失败节点引发的异常。
error: NodeError 参数是可选的。不需要故障上下文的处理程序可以使用更简单的签名,例如 (state)(state, runtime)

使用 Command 进行路由

错误处理程序可以返回一个 Command 来更新状态并路由到特定节点,从而实现 Saga / 补偿模式。
from langgraph.errors import NodeError
from langgraph.types import Command, RetryPolicy
from langgraph.graph import StateGraph, START
from typing_extensions import TypedDict

class State(TypedDict):
    status: str

def reserve_inventory(state: State) -> State:
    return {"status": "reserved"}

def charge_payment(state: State) -> State:
    raise RuntimeError("payment timeout")

def payment_error_handler(state: State, error: NodeError) -> Command:
    return Command(
        update={"status": f"compensated_after_{error.node}: {error.error}"},
        goto="finalize",
    )

def finalize(state: State) -> State:
    return state

graph = (
    StateGraph(State)
    .add_node("reserve_inventory", reserve_inventory)
    .add_node(
        "charge_payment",
        charge_payment,
        retry_policy=RetryPolicy(max_attempts=3, retry_on=ConnectionError),
        error_handler=payment_error_handler,
    )
    .add_node("finalize", finalize)
    .add_edge(START, "reserve_inventory")
    .add_edge("reserve_inventory", "charge_payment")
    .compile()
)
charge_paymentConnectionError 时最多重试 3 次。如果重试耗尽(或错误不是 ConnectionError),处理程序会通过更新状态并路由到 finalize 来进行补偿,而不是中止图。

恢复安全的故障

故障来源会被检查点化。如果图在节点失败后但在处理程序完成前被中断或进程崩溃,当图从其检查点恢复时,处理程序会看到相同的 NodeError 上下文。

interrupt() 的行为

在节点内部引发的 interrupt() 不会路由到错误处理程序。中断使用 GraphBubbleUp 机制来暂停图执行,以实现人在回路工作流,绕过重试策略和错误处理程序。图会像往常一样暂停。

子图故障

如果一个节点封装了一个子图,并且子图引发了未处理的异常,则该异常会浮出到父节点。如果父节点有 error_handler,则处理程序会使用 error.error 中的子图异常来触发。

函数式 API

在函数式 API 中,@task 和 `@entrypoint` 上也提供了相同的 timeout=retry_policy= 参数。
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy, TimeoutPolicy

@task(
    timeout=TimeoutPolicy(idle_timeout=30),
    retry_policy=RetryPolicy(max_attempts=3),
)
async def call_api(url: str) -> str:
    response = await fetch(url)
    return response.text

@entrypoint(timeout=60)
async def my_workflow(inputs: dict) -> str:
    result = await call_api("https://api.example.com/data")
    return result
行为与 add_node 相同:超时时会引发 NodeTimeoutError,清除缓冲写入,并由重试策略决定是否重试。

限制

  • 仅限 Python:JavaScript/TypeScript SDK 中不提供超时和错误处理程序。重试策略在 Python 和 TypeScript 中均可用。
  • 超时仅适用于异步:带有 timeout 的同步节点会在编译时被拒绝。
  • 每个节点一个处理程序:每个节点最多只能有一个 error_handler
  • 处理程序故障会冒泡:如果错误处理程序本身引发异常,则该异常会像节点没有处理程序一样传播。

© . This site is unofficial and not affiliated with LangChain, Inc.