跳到主要内容

文档索引

在以下地址获取完整的文档索引:https://docs.langchain.org.cn/llms.txt

在进一步探索之前,请使用此文件发现所有可用页面。

LangGraph 具有内置的持久化层,可将图状态保存为检查点。当你使用检查点记录器(checkpointer)编译图时,系统会在执行的每一步保存图状态的快照,并按线程(thread)进行组织。这实现了人机回环(human-in-the-loop)工作流、对话记忆、时光旅行调试以及容错执行。 检查点
Agent Server 自动处理检查点 使用 Agent Server 时,你不需要手动实现或配置检查点记录器。服务器会在后台为你处理所有持久化基础设施。

为什么使用持久化

以下功能需要持久化支持
  • 人机回环 (Human-in-the-loop):检查点记录器允许人类检查、中断和批准图的步骤,从而促进 人机回环工作流。这些工作流需要检查点记录器,因为人员必须能够随时查看图的状态,并且图必须能够在人员对状态进行任何更新后恢复执行。请参阅 中断 获取示例。
  • 记忆 (Memory):检查点记录器允许在交互之间保留 “记忆”。在重复的人类交互(如对话)中,任何后续消息都可以发送到该线程,该线程将保留之前交互的记忆。请参阅 添加记忆 了解如何使用检查点记录器添加和管理对话记忆。
  • 时光旅行 (Time travel):检查点记录器允许 “时光旅行”,允许用户回放之前的图执行,以查看和/或调试特定的图步骤。此外,检查点记录器还可以从任意检查点派生(fork)图状态,以探索替代的执行轨迹。
  • 容错性 (Fault-tolerance):检查点提供了容错和错误恢复功能:如果一个或多个节点在给定的超级步中失败,你可以从上一个成功的步骤重新启动图。
  • 待定写入 (Pending writes):当图节点在给定的 超级步 执行中途失败时,LangGraph 会存储该超级步中其他成功完成节点的待定检查点写入。当你从该超级步恢复图执行时,你不需要重新运行已成功的节点。

核心概念

线程

线程(Thread)是分配给由检查点记录器保存的每个检查点的唯一 ID 或线程标识符。它包含了一系列 运行 (runs) 的累积状态。当执行一次运行项目时,助理底层图的 状态 (state) 将持久化到该线程中。 在使用检查点记录器调用图时,你 必须 在配置的 configurable 部分指定 thread_id
{"configurable": {"thread_id": "1"}}
可以检索线程的当前状态和历史状态。为了持久化状态,必须在执行运行之前创建线程。LangSmith API 提供了几个用于创建和管理线程及线程状态的端点。有关更多详细信息,请参阅 API 参考 检查点记录器使用 thread_id 作为存储和检索检查点的主键。如果没有它,检查点记录器将无法保存状态或在 中断 后恢复执行,因为检查点记录器需要通过 thread_id 来加载保存的状态。

检查点 (Checkpoints)

线程在特定时间点的状态称为检查点(Checkpoint)。检查点是在每个 超级步 保存的图状态快照,由 StateSnapshot 对象表示(有关完整的字段参考,请参阅 StateSnapshot 字段)。

超级步 (Super-steps)

LangGraph 在每个 超级步 (super-step) 边界创建一个检查点。超级步是图的一次“心跳”,其中计划在该步骤执行的所有节点都会执行(可能是并行的)。对于像 START -> A -> B -> END 这样的顺序图,输入、节点 A 和节点 B 分别有独立的超级步——在每个步骤之后都会产生一个检查点。理解超级步边界对于 时光旅行 非常重要,因为你只能从检查点(即超级步边界)恢复执行。 除了超级步检查点外,LangGraph 还在 节点(任务)级别 持久化写入。当超级步内的每个节点完成时,其输出将作为链接到正在进行的检查点的任务条目写入检查点记录器的 checkpoint_writes 表中。这些按任务的写入实现了 待定写入 恢复:如果同一超级步中的另一个节点失败,已成功节点的写入已经持久化,无需在恢复时重新运行。超级步完成后,完整的状态快照将被提交。 检查点已被持久化,可用于稍后恢复线程状态。 让我们看看调用一个简单图时会保存哪些检查点:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": "", "bar":[]}, config)
运行图之后,我们预期会看到恰好 4 个检查点
  • 空的检查点,START 为下一个要执行的节点
  • 包含用户输入 {'foo': '', 'bar': []} 的检查点,node_a 为下一个要执行的节点
  • 包含 node_a 输出 {'foo': 'a', 'bar': ['a']} 的检查点,node_b 为下一个要执行的节点
  • 包含 node_b 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,没有下一个要执行的节点
注意,由于我们为 bar 通道定义了归约器(reducer),bar 通道的值包含了两个节点的输出。

检查点命名空间

每个检查点都有一个 checkpoint_ns(检查点命名空间)字段,用于标识它属于哪个图或子图
  • ""(空字符串):该检查点属于父(根)图。
  • "node_name:uuid":该检查点属于作为给定节点调用的子图。对于嵌套子图,命名空间使用 | 分隔符连接(例如,"outer_node:uuid|inner_node:uuid")。
你可以通过配置从节点内部访问检查点命名空间
from langchain_core.runnables import RunnableConfig

def my_node(state: State, config: RunnableConfig):
    checkpoint_ns = config["configurable"]["checkpoint_ns"]
    # "" for the parent graph, "node_name:uuid" for a subgraph
有关处理子图状态和检查点的更多详细信息,请参阅 子图

获取与更新状态

获取状态

在与保存的图状态交互时,你 必须 指定一个 线程标识符。你可以通过调用 graph.get_state(config) 查看图的 最新 状态。这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点,或者如果提供了检查点 ID,则对应于该线程关联的特定检查点。
# get the latest state snapshot
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# get a state snapshot for a specific checkpoint_id
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state 的输出如下所示
StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

StateSnapshot 字段

字段类型描述
字典此检查点处的通道状态值。
下一个tuple[str, ...]下一个要执行的节点名称。空 () 表示图已执行完成。
配置字典包含 thread_idcheckpoint_nscheckpoint_id
metadata字典执行元数据。包含 source"input""loop""update")、writes(节点输出)和 step(超级步计数器)。
created_atstr此检查点创建时的 ISO 8601 时间戳。
parent_configdict | None上一个检查点的配置。第一个检查点为 None
taskstuple[PregelTask, ...]此步骤要执行的任务。每个任务都有 idnameerrorinterrupts,以及可选的 state(使用 subgraphs=True 时的子图快照)。

获取状态历史

你可以通过调用 graph.get_state_history(config) 获取给定线程的完整图执行历史。这将返回一个与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最近的检查点 / StateSnapshot 排在列表的首位。
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
在我们的示例中,get_state_history 的输出如下所示
[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]
State

查找特定检查点

你可以过滤状态历史以查找符合特定条件的检查点
history = list(graph.get_state_history(config))

# Find the checkpoint before a specific node executed
before_node_b = next(s for s in history if s.next == ("node_b",))

# Find a checkpoint by step number
step_2 = next(s for s in history if s.metadata["step"] == 2)

# Find checkpoints created by update_state
forks = [s for s in history if s.metadata["source"] == "update"]

# Find the checkpoint where an interrupt occurred
interrupted = next(
    s for s in history
    if s.tasks and any(t.interrupts for t in s.tasks)
)

重放

重放 (Replay) 会从之前的检查点重新执行步骤。使用先前的 checkpoint_id 调用图,以在该检查点之后重新运行节点。检查点之前的节点将被跳过(其结果已保存)。检查点之后的节点将重新执行,包括任何 LLM 调用、API 请求或 中断——这些在重放期间总是会重新触发。 有关重放过去执行的完整详细信息和代码示例,请参阅 时光旅行 重放

更新状态

你可以使用 update_state 编辑图状态。这会创建一个带有更新值的新检查点——它不会修改原始检查点。更新被视为与节点更新相同:如果定义了 归约器 (reducer) 函数,值将通过归约器传递,因此具有归约器的通道会 累积 值而不是覆盖它们。 你可以选择指定 as_node 来控制更新被视为来自哪个节点,这会影响接下来执行哪个节点。有关详细信息,请参阅 时光旅行:as_node 更新

内存存储 (Memory store)

共享状态模型 状态架构 (State schema) 指定了在执行图时填充的一组键。如上所述,检查点记录器可以在每个图步骤将状态写入线程,从而实现状态持久化。 如果我们想 跨线程 保留一些信息怎么办?考虑一个聊天机器人的情况,我们希望保留该用户在 所有 对话(即线程)中的特定信息(例如,用户的偏好)! 仅凭检查点记录器,我们无法跨线程共享信息。这就需要 Store 接口。例如,我们可以定义一个 InMemoryStore 来跨线程存储用户信息。我们只需像以前一样使用检查点记录器编译图,并传入存储对象即可。
LangGraph API 自动处理存储 使用 LangGraph API 时,你不需要手动实现或配置存储。API 会在后台为你处理所有存储基础设施。
InMemoryStore 适用于开发和测试。对于生产环境,请使用持久性存储,如 PostgresStoreMongoDBStoreRedisStore。所有实现都扩展了 BaseStore,这是节点函数签名中使用的类型注释。

基本用法

首先,让我们脱离 LangGraph 单独演示这一点。
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
记忆通过 tuple 进行命名空间划分,在此示例中为 (<user_id>, "memories")。命名空间可以是任何长度,代表任何内容,不一定要针对特定用户。
user_id = "1"
namespace_for_memory = (user_id, "memories")
我们使用 store.put 方法将记忆保存到存储中的命名空间。执行此操作时,我们要指定如上定义的命名空间以及记忆的键值对:键只是记忆的唯一标识符(memory_id),值(字典)则是记忆本身。
memory_id = str(uuid.uuid4())
memory = {"food_preference" : "I like pizza"}
store.put(namespace_for_memory, memory_id, memory)
我们可以使用 store.search 方法读取命名空间中的记忆,该方法将以列表形式返回给定用户的所有记忆。最近的记忆位于列表的末尾。
memories = store.search(namespace_for_memory)
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
每种记忆类型都是一个带有特定属性的 Python 类(Item)。我们可以通过如上所述的 .dict 转换将其作为字典访问。 它拥有的属性包括:
  • value:此记忆的值(本身是一个字典)
  • key:此记忆在此命名空间中的唯一键
  • namespace:字符串元组,此记忆类型的命名空间
    虽然类型是 tuple[str, ...],但转换为 JSON 时可能会被序列化为列表(例如 ['1', 'memories'])。
  • created_at:此记忆创建时的时间戳
  • updated_at:此记忆更新时的时间戳
除了简单的检索,存储还支持语义搜索,允许你根据含义而不是精确匹配来查找记忆。要启用此功能,请为存储配置嵌入模型 (embedding model)
from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),  # Embedding provider
        "dims": 1536,                              # Embedding dimensions
        "fields": ["food_preference", "$"]              # Fields to embed
    }
)
现在在搜索时,你可以使用自然语言查询来查找相关的记忆
# Find memories about food preferences
# (This can be done after putting memories into the store)
memories = store.search(
    namespace_for_memory,
    query="What does the user like to eat?",
    limit=3  # Return top 3 matches
)
你可以通过配置 fields 参数或在存储记忆时指定 index 参数来控制记忆的哪些部分被嵌入
# Store with specific fields to embed
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {
        "food_preference": "I love Italian cuisine",
        "context": "Discussing dinner plans"
    },
    index=["food_preference"]  # Only embed "food_preferences" field
)

# Store without embedding (still retrievable, but not searchable)
store.put(
    namespace_for_memory,
    str(uuid.uuid4()),
    {"system_info": "Last updated: 2024-01-01"},
    index=False
)

在 LangGraph 中使用

一切准备就绪后,我们在 LangGraph 中使用存储。存储与检查点记录器协同工作:检查点记录器将状态保存到线程(如上所述),而存储允许我们存储任意信息以便 线程访问。我们按照以下方式同时使用检查点记录器和存储编译图。
from dataclasses import dataclass
from langgraph.checkpoint.memory import InMemorySaver

@dataclass
class Context:
    user_id: str

# We need this because we want to enable threads (conversations)
checkpointer = InMemorySaver()

# ... Define the graph ...

# Compile the graph with the checkpointer and store
builder = StateGraph(MessagesState, context_schema=Context)
# ... add nodes and edges ...
graph = builder.compile(checkpointer=checkpointer, store=store)
我们像以前一样使用 thread_id 调用图,并额外带上 user_id,正如我们上面展示的,我们将使用它来为该特定用户的记忆划分命名空间。
# Invoke the graph
config = {"configurable": {"thread_id": "1"}}

# First let's just say hi to the AI
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi"}]},
    config,
    stream_mode="updates",
    context=Context(user_id="1"),
):
    print(update)
你可以通过 Runtime 对象在 任何节点 中访问存储和 user_id。当你将 Runtime 作为参数添加到节点函数时,LangGraph 会自动注入它。以下是你可以使用它来保存记忆的方法
from langgraph.runtime import Runtime
from dataclasses import dataclass

@dataclass
class Context:
    user_id: str

async def update_memory(state: MessagesState, runtime: Runtime[Context]):

    # Get the user id from the runtime context
    user_id = runtime.context.user_id

    # Namespace the memory
    namespace = (user_id, "memories")

    # ... Analyze conversation and create a new memory

    # Create a new memory ID
    memory_id = str(uuid.uuid4())

    # We create a new memory
    await runtime.store.aput(namespace, memory_id, {"memory": memory})

如上所示,我们也可以在任何节点中访问存储,并使用 store.search 方法获取记忆。回想一下,记忆是以可以转换为字典的对象列表形式返回的。
memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},
 'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
 'namespace': ['1', 'memories'],
 'created_at': '2024-10-02T17:22:31.590602+00:00',
 'updated_at': '2024-10-02T17:22:31.590605+00:00'}
我们可以访问记忆并在模型调用中使用它们。
from dataclasses import dataclass
from langgraph.runtime import Runtime

@dataclass
class Context:
    user_id: str

async def call_model(state: MessagesState, runtime: Runtime[Context]):
    # Get the user id from the runtime context
    user_id = runtime.context.user_id

    # Namespace the memory
    namespace = (user_id, "memories")

    # Search based on the most recent message
    memories = await runtime.store.asearch(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    info = "\n".join([d.value["memory"] for d in memories])

    # ... Use memories in the model call
如果我们创建一个新线程,只要 user_id 相同,我们仍然可以访问相同的记忆。
# Invoke the graph on a new thread
config = {"configurable": {"thread_id": "2"}}

# Let's say hi again
for update in graph.stream(
    {"messages": [{"role": "user", "content": "hi, tell me about my memories"}]},
    config,
    stream_mode="updates",
    context=Context(user_id="1"),
):
    print(update)
当我们使用 LangSmith 时,无论是本地(例如在 Studio 中)还是 托管在 LangSmith 上,基础存储默认可用,不需要在图编译期间指定。但是,要启用语义搜索,你 确实 需要在 langgraph.json 文件中配置索引设置。例如
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
有关更多详细信息和配置选项,请参阅 部署指南

优化检查点存储

默认情况下,LangGraph 检查点会在每个超级步写入每个状态通道的完整值。对于长期运行且有大量积累的线程(如多轮对话),这可能会导致存储空间随时间显著增长。 DeltaChannel 仅存储增量(deltas),而不是完整的累积值,从而大幅减少了高频追加通道的检查点大小。有关用法以及存储与延迟之间的权衡,请参阅 DeltaChannel
DeltaChannel 需要 langgraph>=1.2,目前处于测试阶段 (beta)。API 可能会在未来的版本中发生变化。

检查点记录器库

在底层,检查点功能由符合 BaseCheckpointSaver 接口的检查点记录器对象驱动。LangGraph 提供了几种检查点记录器实现,它们都是通过独立的、可安装的库实现的。
请参阅 检查点记录器集成 了解可用的提供商。
  • langgraph-checkpoint:检查点记录器(BaseCheckpointSaver)的基础接口以及序列化/反序列化接口(SerializerProtocol)。包括用于实验的内存中检查点记录器实现(InMemorySaver)。LangGraph 已经包含了 langgraph-checkpoint
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库的 LangGraph 检查点记录器实现(SqliteSaver / AsyncSqliteSaver)。非常适合实验和本地工作流。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库的高级检查点记录器(PostgresSaver / AsyncPostgresSaver),在 LangSmith 中使用。非常适合生产环境。需要单独安装。
  • langchain-azure-cosmosdb:使用 Azure Cosmos DB for NoSQL 的 LangGraph 检查点记录器实现(CosmosDBSaverSync / CosmosDBSaver)。非常适合在 Azure 生产环境中使用。支持同步和异步操作,并支持 Microsoft Entra ID 身份验证。需要单独安装。

检查点记录器接口

每个检查点记录器都符合 BaseCheckpointSaver 接口,并实现了以下方法
  • .put - 存储带有配置和元数据的检查点。
  • .put_writes - 存储链接到检查点的中间写入(即 待定写入)。
  • .get_tuple - 使用给定的配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.get_state() 中填充 StateSnapshot
  • .list - 列出符合给定配置和过滤条件的检查点。这用于在 graph.get_state_history() 中填充状态历史记录。
如果检查点记录器用于异步图执行(即通过 .ainvoke.astream.abatch 执行图),将使用上述方法的异步版本(.aput.aput_writes.aget_tuple.alist)。
对于异步运行图,你可以使用 InMemorySaver,或者 Sqlite/Postgres 检查点记录器的异步版本——AsyncSqliteSaver / AsyncPostgresSaver 检查点记录器。

序列化器

当检查点记录器保存图状态时,它们需要对状态中的通道值进行序列化。这是使用序列化器对象完成的。 langgraph_checkpoint 定义了实现序列化器的 协议 (protocol),并提供了一个默认实现(JsonPlusSerializer),它可以处理多种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

使用 pickle 进行序列化

默认序列化器 JsonPlusSerializer 在底层使用 ormsgpack 和 JSON,这并不适用于所有类型的对象。 如果你希望对于当前 msgpack 编码器不支持的对象(如 Pandas DataFrames)回退到 pickle,可以使用 JsonPlusSerializerpickle_fallback 参数:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

# ... Define the graph ...
graph.compile(
    checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

加密

检查点记录器可以选择对所有持久化状态进行加密。要启用此功能,请将 EncryptedSerializer 实例传递给任何 BaseCheckpointSaver 实现的 serde 参数。创建加密序列化器最简单的方法是通过 from_pycryptodome_aes,它会从 LANGGRAPH_AES_KEY 环境变量中读取 AES 密钥(或接受一个 key 参数)。
import sqlite3

from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.sqlite import SqliteSaver

serde = EncryptedSerializer.from_pycryptodome_aes()  # reads LANGGRAPH_AES_KEY
checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"), serde=serde)
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()
在 LangSmith 上运行时,只要存在 LANGGRAPH_AES_KEY,就会自动启用加密,因此你只需提供该环境变量即可。其他加密方案可以通过实现 CipherProtocol 并将其提供给 EncryptedSerializer 来使用。
© . This site is unofficial and not affiliated with LangChain, Inc.