跳到主要内容
持久化执行是一种技术,其中进程或工作流在关键点保存其进度,使其能够暂停并在以后精确地从上次中断的地方恢复。这在需要人工干预的场景中特别有用,用户可以在继续之前检查、验证或修改进程;以及在可能遇到中断或错误(例如,对 LLM 的调用超时)的长时间运行任务中。通过保留已完成的工作,持久化执行允许进程在不重新处理先前步骤的情况下恢复——即使在显著延迟(例如,一周后)之后。 LangGraph 内置的持久化层为工作流提供持久化执行,确保每个执行步骤的状态都保存到持久化存储中。此功能保证,如果工作流中断——无论是由于系统故障还是人工干预——它都可以从其最后记录的状态恢复。
如果您正在使用带有检查点的 LangGraph,您已经启用了持久化执行。您可以在任何时候暂停和恢复工作流,即使在中断或失败之后。为了最大限度地利用持久化执行,请确保您的工作流被设计为确定性幂等的,并将任何副作用或非确定性操作封装在任务中。您可以同时使用StateGraph(图 API)函数式 API中的任务

要求

要在 LangGraph 中利用持久化执行,您需要
  1. 通过指定一个将保存工作流进度的检查点来在工作流中启用持久化
  2. 在执行工作流时指定一个线程标识符。这将跟踪工作流特定实例的执行历史。
  3. 将任何非确定性操作(例如,随机数生成)或具有副作用的操作(例如,文件写入、API 调用)封装在 @[task] 中,以确保当工作流恢复时,这些操作不会在该特定运行中重复,而是从持久化层中检索其结果。有关更多信息,请参阅确定性和一致性回放

确定性和一致性回放

当您恢复工作流运行时,代码不会从执行停止的同一行代码恢复;相反,它将识别一个适当的起点,从该起点继续执行。这意味着工作流将从起点回放所有步骤,直到达到停止点。 因此,当您编写用于持久化执行的工作流时,必须将任何非确定性操作(例如,随机数生成)和任何具有副作用的操作(例如,文件写入、API 调用)封装在任务节点中。 为了确保您的工作流是确定性的并且可以一致地回放,请遵循以下准则:
  • 避免重复工作:如果一个节点包含多个具有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作封装在一个单独的任务中。这确保了当工作流恢复时,这些操作不会重复,并且它们的结果会从持久化层中检索。
  • 封装非确定性操作: 将任何可能产生非确定性结果的代码(例如,随机数生成)封装在任务节点中。这确保了在恢复时,工作流按照精确记录的步骤序列和相同的结果进行。
  • 使用幂等操作:尽可能确保副作用(例如,API 调用、文件写入)是幂等的。这意味着如果操作在工作流中失败后重试,它将与首次执行时具有相同的效果。这对于导致数据写入的操作尤为重要。如果一个任务开始但未能成功完成,工作流的恢复将重新运行该任务,依靠记录的结果来保持一致性。使用幂等键或验证现有结果以避免意外重复,确保平稳且可预测的工作流执行。
有关要避免的一些陷阱示例,请参阅函数式 API 中的常见陷阱部分,其中展示了如何使用任务来构造代码以避免这些问题。相同的原则适用于StateGraph(图 API)

持久化模式

LangGraph 支持三种持久化模式,允许您根据应用程序的要求平衡性能和数据一致性。持久化模式从低到高如下所示: 较高的持久化模式会增加工作流执行的开销。
v0.6.0 新增 使用 durability 参数代替 checkpoint_during(v0.6.0 中已弃用)进行持久化策略管理
  • durability="async" 替代 checkpoint_during=True
  • durability="exit" 替代 checkpoint_during=False
用于持久化策略管理,具有以下映射关系:
  • checkpoint_during=True -> durability="async"
  • checkpoint_during=False -> durability="exit"

"exit"

只有在图执行完成(无论是成功还是出错)时,更改才会被持久化。这为长时间运行的图提供了最佳性能,但意味着不会保存中间状态,因此您无法从执行中的故障中恢复或中断图执行。

"async"

当下一步执行时,更改会异步持久化。这提供了良好的性能和持久性,但如果进程在执行过程中崩溃,检查点可能不会被写入的风险很小。

"sync"

在下一步开始之前,更改会同步持久化。这确保了在继续执行之前写入每个检查点,以牺牲一定的性能开销为代价提供了高持久性。 您可以在调用任何图执行方法时指定持久性模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)

在节点中使用任务

如果一个节点包含多个操作,您可能会发现将每个操作转换为一个任务而不是将操作重构为单独的节点更容易。
  • 原版
  • 带任务
from typing import NotRequired
from typing_extensions import TypedDict
import uuid

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# Define a TypedDict to represent the state
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """Example node that makes an API request."""
    result = requests.get(state['url']).text[:100]  # Side-effect  #
    return {
        "result": result
    }

# Create a StateGraph builder and add a node for the call_api function
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# Connect the start and end nodes to the call_api node
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# Specify a checkpointer
checkpointer = InMemorySaver()

# Compile the graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Define a config with a thread ID.
thread_id = uuid.uuid4()
config = {"configurable": {"thread_id": thread_id}}

# Invoke the graph
graph.invoke({"url": "https://www.example.com"}, config)

恢复工作流

在工作流中启用持久化执行后,您可以为以下场景恢复执行
  • 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语以更新状态恢复它。有关更多详细信息,请参阅中断
  • 从故障中恢复: 在发生异常(例如 LLM 提供程序中断)后,自动从上次成功的检查点恢复工作流。这涉及通过提供 None 作为输入值来使用相同的线程标识符执行工作流(请参阅此函数式 API 的示例)。

恢复工作流的起点

  • 如果您正在使用StateGraph(图 API),起点是执行停止的节点的开头。
  • 如果您正在节点内进行子图调用,起点将是调用被停止的子图的节点。在子图内部,起点将是执行停止的特定节点
  • 如果您正在使用函数式 API,起点是执行停止的入口点的开头。

以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.