跳到主要内容
LangGraph 拥有一个内置的持久化层,通过检查点实现。当您使用检查点编译图时,检查点会在每个超步骤保存图状态的检查点。这些检查点会保存到一个线程中,可以在图执行后访问。由于线程允许在执行后访问图的状态,因此诸如人机协作、内存、时间旅行和容错等多种强大功能都成为可能。下面,我们将更详细地讨论这些概念。 检查点
LangGraph API 自动处理检查点 使用 LangGraph API 时,您无需手动实现或配置检查点。API 在后台为您处理所有持久化基础设施。

线程

线程是分配给检查点保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当执行运行后,助手的底层图的状态将持久化到线程中。 当使用检查点调用图时,您必须在配置的configurable部分指定一个thread_id
{"configurable": {"thread_id": "1"}}
可以检索线程的当前和历史状态。为了持久化状态,必须在执行运行之前创建线程。LangSmith API 提供了几个用于创建和管理线程和线程状态的端点。有关更多详细信息,请参阅API 参考

检查点

线程在特定时间点的状态称为检查点。检查点是每个超步骤保存的图状态的快照,由具有以下关键属性的StateSnapshot对象表示
  • config:与此检查点关联的配置。
  • metadata:与此检查点关联的元数据。
  • values:此时状态通道的值。
  • next:图中要执行的下一个节点名称的元组。
  • tasks:包含要执行的下一个任务信息的PregelTask对象元组。如果该步骤之前已尝试过,它将包含错误信息。如果图在节点内被动态中断,则任务将包含与中断相关的附加数据。
检查点是持久化的,可用于在以后恢复线程的状态。 让我们看看当如下调用一个简单图时会保存哪些检查点:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
运行图后,我们期望看到正好 4 个检查点
  • 空检查点,将START作为下一个要执行的节点
  • 检查点,包含用户输入{'foo': '', 'bar': []},将node_a作为下一个要执行的节点
  • 检查点,包含node_a的输出{'foo': 'a', 'bar': ['a']},将node_b作为下一个要执行的节点
  • 检查点,包含node_b的输出{'foo': 'b', 'bar': ['a', 'b']},没有下一个要执行的节点
请注意,由于我们对bar通道有 reducer,因此bar通道值包含来自两个节点的输出。

获取状态

与保存的图状态交互时,您必须指定一个线程标识符。您可以通过调用graph.get_state(config)来查看图的最新状态。这将返回一个StateSnapshot对象,该对象对应于配置中提供的线程 ID 的最新检查点,或者如果提供,则对应于该线程的检查点 ID 的检查点。
# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state的输出将如下所示
StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

获取状态历史

您可以通过调用graph.get_state_history(config)获取给定线程的图执行的完整历史记录。这将返回与配置中提供的线程 ID 关联的StateSnapshot对象列表。重要的是,检查点将按时间顺序排列,最近的检查点/StateSnapshot位于列表的首位。
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
在我们的示例中,get_state_history的输出将如下所示
[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]
State

重播

也可以回放之前的图执行。如果我们使用`thread_id`和`checkpoint_id`调用一个图,那么我们将重放与`checkpoint_id`对应的检查点之前已执行的步骤,并且只执行检查点之后的步骤。
  • thread_id是线程的 ID。
  • checkpoint_id是引用线程中特定检查点的标识符。
在调用图时,您必须将它们作为配置的configurable部分的一部分传递。
config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)
重要的是,LangGraph 知道特定步骤是否以前执行过。如果执行过,LangGraph 只会重播图中的特定步骤,而不会重新执行该步骤,但仅限于提供的checkpoint_id之前的步骤。checkpoint_id之后的所有步骤都将被执行(即新的分支),即使它们以前执行过。请参阅此关于时间旅行的指南,了解有关重播的更多信息 重播

更新状态

除了从特定的检查点重放图之外,我们还可以编辑图状态。我们使用update_state来完成此操作。此方法接受三个不同的参数

配置

配置应包含thread_id,指定要更新哪个线程。当只传递thread_id时,我们更新(或分叉)当前状态。可选地,如果包含checkpoint_id字段,则分叉该选定的检查点。

这些值将用于更新状态。请注意,此更新的处理方式与来自节点的任何更新完全相同。这意味着这些值将传递给reducer函数(如果它们定义了图中某些通道的 reducer)。这意味着update_state不会自动覆盖每个通道的通道值,而只会覆盖没有 reducer 的通道。让我们看一个例子。 假设您已经使用以下 schema 定义了图的状态(请参阅上面的完整示例):
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]
现在假设图的当前状态是
{"foo": 1, "bar": ["a"]}
如果您如下更新状态
graph.update_state(config, {"foo": 2, "bar": ["b"]})
那么图的新状态将是
{"foo": 2, "bar": ["a", "b"]}
`foo` 键(通道)被完全更改(因为该通道未指定 reducer,因此`update_state`会覆盖它)。但是,`bar` 键指定了一个 reducer,因此它将 `"b"` 附加到 `bar` 的状态中。

作为节点

调用update_state时,您可以选择指定的最后一件事是as_node。如果您提供了它,更新将像来自节点as_node一样应用。如果未提供as_node,它将被设置为最后更新状态的节点(如果不模糊)。这很重要,因为要执行的下一步取决于最后一个进行更新的节点,因此这可用于控制下一个执行的节点。请参阅此关于时间旅行的指南,了解有关分叉状态的更多信息 更新

内存存储

共享状态模型 状态 Schema 指定了一组键,这些键在图执行时被填充。如上所述,状态可以通过检查点在每个图步骤写入线程,从而实现状态持久化。 但是,如果我们想在不同线程之间保留某些信息怎么办?考虑一个聊天机器人,我们希望在与用户的所有聊天对话(例如,线程)中保留关于用户的特定信息! 仅凭检查点,我们无法在线程之间共享信息。这激发了对Store接口的需求。为了说明这一点,我们可以定义一个InMemoryStore来存储关于用户在不同线程之间的信息。我们只需像以前一样,使用检查点和我们的新in_memory_store变量编译我们的图。
LangGraph API 自动处理存储 使用 LangGraph API 时,您无需手动实现或配置存储。API 在后台为您处理所有存储基础设施。

基本用法

首先,让我们在不使用 LangGraph 的情况下单独展示这一点。
from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()
内存通过`tuple`进行命名空间划分,在此特定示例中为`(<user_id>, "memories")`。命名空间可以是任意长度,并表示任何内容,不一定与用户相关。
user_id = "1"
namespace_for_memory = (user_id, "memories")
我们使用store.put方法将内存保存到存储中的命名空间。执行此操作时,我们指定命名空间(如上定义)以及内存的键值对:键只是内存的唯一标识符(memory_id),值(字典)是内存本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)
我们可以使用`store.search`方法读取命名空间中的内存,该方法将返回给定用户的所有内存作为列表。最新的内存位于列表末尾。
memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种记忆类型都是一个带有特定属性的 Python 类(Item)。我们可以像上面那样通过.dict将其转换为字典来访问它。 它具有的属性是:
  • value:此记忆的值(本身是一个字典)
  • key:此记忆在此命名空间中的唯一键
  • namespace:一个字符串列表,此记忆类型的命名空间
  • created_at:此记忆创建的时间戳
  • updated_at:此记忆更新的时间戳
除了简单的检索,该存储还支持语义搜索,允许您根据含义而不是精确匹配来查找记忆。要启用此功能,请使用嵌入模型配置存储
from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)
现在搜索时,您可以使用自然语言查询来查找相关记忆。
# Find memories about food preferences
# (This can be done after putting memories into the store)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # Return top 3 matches
)
您可以通过配置`fields`参数或在存储记忆时指定`index`参数来控制记忆的哪些部分被嵌入
# Store with specific fields to embed
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # Only embed "food_preferences" field
)

# Store without embedding (still retrievable, but not searchable)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

在 LangGraph 中使用

一切就绪后,我们在 LangGraph 中使用in_memory_storein_memory_store与检查点器协同工作:检查点器将状态保存到线程中,如上所述,而in_memory_store允许我们存储任意信息以供线程访问。我们如下所示编译图,同时使用检查点器和in_memory_store
from langgraph.checkpoint.memory import InMemorySaver

# We need this because we want to enable threads (conversations)
checkpointer = InMemorySaver()

# ... Define the graph ...

# Compile the graph with the checkpointer and store
graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)
我们像以前一样,使用`thread_id`调用图,也使用`user_id`,我们将用它来将我们的记忆命名到这个特定的用户,如我们上面所示。
# Invoke the graph
user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}

# First let's just say hi to the AI
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]}, config, stream_mode="updates"
):
    print(update)
我们可以通过将store: BaseStoreconfig: RunnableConfig作为节点参数传递,在任何节点中访问in_memory_storeuser_id。以下是我们可能在节点中使用语义搜索来查找相关记忆的方法
def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):

    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # ... Analyze conversation and create a new memory

    # Create a new memory ID
    memory_id = str(uuid.uuid4())

    # We create a new memory
    store.put(namespace, memory_id, {"memory": memory})

如上所示,我们还可以在任何节点中访问存储并使用store.search方法获取记忆。回想一下,记忆以对象列表的形式返回,可以转换为字典。
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
我们可以访问内存并在模型调用中使用它们。
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # Get the user id from the config
    user_id = config["configurable"]["user_id"]

    # Namespace the memory
    namespace = (user_id, "memories")

    # Search based on the most recent message
    memories = store.search(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... Use memories in the model call
如果我们创建一个新线程,只要`user_id`相同,我们仍然可以访问相同的内存。
# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}

# Let's say hi again
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):
    print(update)
当我们在 LangSmith 上运行(例如,在Studio中)或在 LangSmith 托管时,基础存储默认可用,无需在图编译期间指定。但是,要启用语义搜索,您确实需要在langgraph.json文件中配置索引设置。例如
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
有关更多详细信息和配置选项,请参阅部署指南

检查点库

在幕后,检查点功能由符合BaseCheckpointSaver接口的检查点对象提供。LangGraph 提供了多种检查点实现,所有这些都通过独立的、可安装的库实现
  • langgraph-checkpoint:检查点保存器(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)的基本接口。包括用于实验的内存中检查点实现(InMemorySaver)。LangGraph 包含langgraph-checkpoint
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库实现 LangGraph 检查点(SqliteSaver / AsyncSqliteSaver)。适用于实验和本地工作流。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库(PostgresSaver / AsyncPostgresSaver)的高级检查点,用于 LangSmith。适用于生产环境。需要单独安装。

检查点接口

每个检查点都符合BaseCheckpointSaver接口并实现了以下方法
  • .put - 存储带配置和元数据的检查点。
  • .put_writes - 存储与检查点关联的中间写入(即待定写入)。
  • .get_tuple - 使用给定配置(thread_idcheckpoint_id)获取检查点元组。这用于在graph.get_state()中填充StateSnapshot
  • .list - 列出符合给定配置和过滤条件的检查点。这用于在graph.get_state_history()中填充状态历史。
如果检查点器与异步图执行一起使用(即通过.ainvoke.astream.abatch执行图),则将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。
为了异步运行您的图,您可以使用InMemorySaver,或者 SQLite/Postgres 检查点器的异步版本 — AsyncSqliteSaver / AsyncPostgresSaver 检查点器。

序列化器

当检查点保存图状态时,它们需要序列化状态中的通道值。这是通过序列化器对象完成的。 langgraph_checkpoint定义了实现序列化器的协议,并提供了默认实现(JsonPlusSerializer),该实现处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

使用pickle进行序列化

默认的序列化器JsonPlusSerializer在底层使用 ormsgpack 和 JSON,这不适用于所有类型的对象。 如果您想对我们当前的 msgpack 编码器不支持的对象(例如 Pandas 数据帧)回退到 pickle,您可以使用JsonPlusSerializerpickle_fallback参数:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... Define the graph ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

加密

检查点器可以选择加密所有持久化状态。要启用此功能,请将EncryptedSerializer的实例传递给任何BaseCheckpointSaver实现的serde参数。创建加密序列化器最简单的方法是通过from_pycryptodome_aes,它从LANGGRAPH_AES_KEY环境变量读取 AES 密钥(或接受key参数)。
import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()
在 LangSmith 上运行时,只要LANGGRAPH_AES_KEY存在,加密就会自动启用,因此您只需提供环境变量即可。可以通过实现CipherProtocol并将其提供给EncryptedSerializer来使用其他加密方案。

能力

人工干预

首先,检查点通过允许人类检查、中断和批准图步骤来促进人机协作工作流。这些工作流需要检查点,因为人类必须能够随时查看图的状态,并且图在人类对状态进行任何更新后必须能够恢复执行。有关示例,请参阅操作指南

内存

其次,检查点允许在交互之间拥有“记忆”。在重复的人机交互(如对话)的情况下,任何后续消息都可以发送到该线程,该线程将保留其对以前消息的记忆。请参阅添加记忆以了解如何使用检查点添加和管理对话记忆。

时间旅行

第三,检查点允许“时间旅行”,允许用户重放先前的图执行,以审查和/或调试特定的图步骤。此外,检查点使得在任意检查点分叉图状态以探索替代轨迹成为可能。

容错

最后,检查点还提供容错和错误恢复功能:如果一个或多个节点在给定超步骤失败,您可以从上次成功步骤重新启动图。此外,当图节点在给定超步骤执行中途失败时,LangGraph 会存储该超步骤中任何其他成功完成的节点的待定检查点写入,以便在从该超步骤恢复图执行时,我们不会重新运行成功的节点。

待定写入

此外,当图节点在给定超步骤执行中途失败时,LangGraph 会存储该超步骤中任何其他成功完成的节点的待定检查点写入,以便在从该超步骤恢复图执行时,我们不会重新运行成功的节点。
以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.