跳到主要内容

文档索引

在以下地址获取完整的文档索引:https://docs.langchain.org.cn/llms.txt

在进一步探索之前,请使用此文件发现所有可用页面。

LangGraph 具有内置的持久化层,可以将图状态保存为检查点。当您使用检查点记录器编译图时,系统会在执行的每一步保存图状态的快照,并按线程进行组织。这实现了人机交互 (human-in-the-loop) 工作流、对话记忆、时间旅行调试和容错执行。 检查点
Agent Server 自动处理检查点 使用 Agent Server 时,您无需手动实现或配置检查点记录器。服务器会在后台为您处理所有持久化基础设施。

为什么使用持久化

以下功能需要持久化支持
  • 人机交互 (Human-in-the-loop):检查点记录器允许人员检查、中断和批准图步骤,从而促进 人机交互工作流。这些工作流需要检查点记录器,因为人员必须能够随时查看图的状态,并且图必须能够在人员对状态进行任何更新后恢复执行。请参阅 中断 获取示例。
  • 记忆 (Memory):检查点记录器允许在交互之间建立 “记忆”。在重复的人机交互(如对话)情况下,任何后续消息都可以发送到该线程,该线程将保留对之前消息的记忆。请参阅 添加记忆 了解如何使用检查点记录器添加和管理对话记忆。
  • 时间旅行 (Time travel):检查点记录器允许 “时间旅行”,允许用户重放之前的图执行,以查看和/或调试特定的图步骤。此外,检查点记录器使得在任意检查点分叉图状态以探索替代路径成为可能。
  • 容错性 (Fault-tolerance):检查点提供容错和错误恢复:如果一个或多个节点在给定的超级步骤中失败,您可以从上一个成功的步骤重启图。
  • 待处理写入 (Pending writes):当图节点在给定的 超级步骤 执行中途失败时,LangGraph 会存储该超级步骤中成功完成的其他节点的待处理检查点写入。当您从该超级步骤恢复图执行时,无需重新运行已成功的节点。

核心概念

线程

线程 (Thread) 是分配给检查点记录器保存的每个检查点的唯一 ID 或线程标识符。它包含了一系列 运行 (runs) 的累积状态。当执行运行端时,辅助程序基础图的 状态 将持久化到该线程中。 在使用检查点记录器调用图时,您必须在配置的 configurable 部分指定 thread_id
{
  configurable: {
    thread_id: "1";
  }
}
线程的当前状态和历史状态都可以被检索。为了持久化状态,必须在执行运行之前创建一个线程。LangSmith API 提供了几个用于创建和管理线程及线程状态的端点。有关更多详细信息,请参阅 API 参考 检查点记录器使用 thread_id 作为存储和检索检查点的主键。没有它,检查点记录器就无法保存状态,也无法在 中断 后恢复执行,因为检查点记录器使用 thread_id 来加载保存的状态。

检查点 (Checkpoints)

线程在特定时间点的状态称为检查点。检查点是每个 超级步骤 保存的图状态快照,由 StateSnapshot 对象表示(有关完整的字段参考,请参阅 StateSnapshot 字段)。

超级步骤 (Super-steps)

LangGraph 在每个超级步骤边界处创建一个检查点。超级步骤是图的一次“刻度”,在该刻度中,为该步骤调度的所有节点都会执行(可能是并行执行)。对于像 START -> A -> B -> END 这样的顺序图,输入、节点 A 和节点 B 都有各自独立的超级步骤——在每一个步骤之后都会生成一个检查点。理解超级步骤边界对于 时间旅行 非常重要,因为您只能从检查点(即超级步骤边界)恢复执行。 除了超级步骤检查点之外,LangGraph 还会持久化节点(任务)级别的写入。当超级步骤中的每个节点完成时,其输出将作为与正在进行的检查点关联的任务条目写入检查点记录器的 checkpoint_writes 表中。这些按任务的写入实现了 待处理写入 恢复:如果同一超级步骤中的另一个节点失败,已成功节点的写入已经持久化,恢复时不需要重新运行。一旦超级步骤完成,完整的状态快照就会被提交。 检查点是持久化的,以后可以用于恢复线程的状态。 让我们看看在如下调用一个简单的图时会保存哪些检查点:
import { StateGraph, StateSchema, ReducedValue, START, END, MemorySaver } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  foo: z.string(),
  bar: new ReducedValue(
    z.array(z.string()).default(() => []),
    {
      inputSchema: z.array(z.string()),
      reducer: (x, y) => x.concat(y),
    }
  ),
});

const workflow = new StateGraph(State)
  .addNode("nodeA", (state) => {
    return { foo: "a", bar: ["a"] };
  })
  .addNode("nodeB", (state) => {
    return { foo: "b", bar: ["b"] };
  })
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "", bar: [] }, config);
运行图之后,我们预期会看到正好 4 个检查点
  • 空检查点,下一个要执行的节点是 START
  • 带有用户输入 {'foo': '', 'bar': []} 的检查点,下一个要执行的节点是 nodeA
  • 带有 nodeA 输出 {'foo': 'a', 'bar': ['a']} 的检查点,下一个要执行的节点是 nodeB
  • 带有 nodeB 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,没有下一个要执行的节点
请注意,由于我们为 bar 通道设置了 reducer,因此 bar 通道的值包含来自两个节点的输出。

检查点命名空间

每个检查点都有一个 checkpoint_ns(检查点命名空间)字段,用于标识它属于哪个图或子图
  • ""(空字符串):该检查点属于父(根)图。
  • "node_name:uuid":该检查点属于作为给定节点调用的子图。对于嵌套子图,命名空间使用 | 分隔符连接(例如 "outer_node:uuid|inner_node:uuid")。
您可以通过配置在节点内访问检查点命名空间
import { RunnableConfig } from "@langchain/core/runnables";

function myNode(state: typeof State.Type, config: RunnableConfig) {
  const checkpointNs = config.configurable?.checkpoint_ns;
  // "" for the parent graph, "node_name:uuid" for a subgraph
}
有关使用子图状态和检查点的更多详细信息,请参阅 子图

获取并更新状态

获取状态

在与保存的图状态交互时,您必须指定一个 线程标识符。您可以通过调用 graph.getState(config) 来查看图的最新状态。这将返回一个 StateSnapshot 对象,该对象对应于与配置中提供的线程 ID 关联的最新检查点,或者如果提供了检查点 ID,则对应于该线程的特定检查点。
// get the latest state snapshot
const config = { configurable: { thread_id: "1" } };
await graph.getState(config);

// get a state snapshot for a specific checkpoint_id
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "1ef663ba-28fe-6528-8002-5a559208592c",
  },
};
await graph.getState(config);
在我们的示例中,getState 的输出如下所示
StateSnapshot {
  values: { foo: 'b', bar: ['a', 'b'] },
  next: [],
  config: {
    configurable: {
      thread_id: '1',
      checkpoint_ns: '',
      checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c'
    }
  },
  metadata: {
    source: 'loop',
    writes: { nodeB: { foo: 'b', bar: ['b'] } },
    step: 2
  },
  createdAt: '2024-08-29T19:19:38.821749+00:00',
  parentConfig: {
    configurable: {
      thread_id: '1',
      checkpoint_ns: '',
      checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
    }
  },
  tasks: []
}

StateSnapshot 字段

字段类型描述
对象此检查点处的状态通道值。
下一个字符串[]下一步要执行的节点名称。空数组 [] 表示图已执行完成。
配置对象包含 thread_idcheckpoint_nscheckpoint_id
metadata对象执行元数据。包含 source"input""loop""update")、writes(节点输出)和 step(超级步骤计数器)。
createdAt字符串此检查点创建时的 ISO 8601 时间戳。
parentConfig对象 | null前一个检查点的配置。第一个检查点为 null
tasksPregelTask[]在此步骤要执行的任务。每个任务都有 idnameerrorinterrupts,以及可选的 state(使用 subgraphs: true 时的子图快照)。

获取状态历史

您可以通过调用 graph.getStateHistory(config) 获取给定线程的完整图执行历史记录。这将返回一个与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最近的检查点 / StateSnapshot 排在列表的首位。
const config = { configurable: { thread_id: "1" } };
for await (const state of graph.getStateHistory(config)) {
  console.log(state);
}
在我们的示例中,getStateHistory 的输出如下所示
[
  StateSnapshot {
    values: { foo: 'b', bar: ['a', 'b'] },
    next: [],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c'
      }
    },
    metadata: {
      source: 'loop',
      writes: { nodeB: { foo: 'b', bar: ['b'] } },
      step: 2
    },
    createdAt: '2024-08-29T19:19:38.821749+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
      }
    },
    tasks: []
  },
  StateSnapshot {
    values: { foo: 'a', bar: ['a'] },
    next: ['nodeB'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
      }
    },
    metadata: {
      source: 'loop',
      writes: { nodeA: { foo: 'a', bar: ['a'] } },
      step: 1
    },
    createdAt: '2024-08-29T19:19:38.819946+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a'
      }
    },
    tasks: [
      PregelTask {
        id: '6fb7314f-f114-5413-a1f3-d37dfe98ff44',
        name: 'nodeB',
        error: null,
        interrupts: []
      }
    ]
  },
  StateSnapshot {
    values: { foo: '', bar: [] },
    next: ['node_a'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a'
      }
    },
    metadata: {
      source: 'loop',
      writes: null,
      step: 0
    },
    createdAt: '2024-08-29T19:19:38.817813+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f0-6c66-bfff-6723431e8481'
      }
    },
    tasks: [
      PregelTask {
        id: 'f1b14528-5ee5-579c-949b-23ef9bfbed58',
        name: 'node_a',
        error: null,
        interrupts: []
      }
    ]
  },
  StateSnapshot {
    values: { bar: [] },
    next: ['__start__'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f0-6c66-bfff-6723431e8481'
      }
    },
    metadata: {
      source: 'input',
      writes: { foo: '' },
      step: -1
    },
    createdAt: '2024-08-29T19:19:38.816205+00:00',
    parentConfig: null,
    tasks: [
      PregelTask {
        id: '6d27aa2e-d72b-5504-a36f-8620e54a76dd',
        name: '__start__',
        error: null,
        interrupts: []
      }
    ]
  }
]
State

查找特定检查点

您可以过滤状态历史记录以查找符合特定条件的检查点
const history: StateSnapshot[] = [];
for await (const state of graph.getStateHistory(config)) {
  history.push(state);
}

// Find the checkpoint before a specific node executed
const beforeNodeB = history.find((s) => s.next.includes("nodeB"));

// Find a checkpoint by step number
const step2 = history.find((s) => s.metadata.step === 2);

// Find checkpoints created by updateState
const forks = history.filter((s) => s.metadata.source === "update");

// Find the checkpoint where an interrupt occurred
const interrupted = history.find(
  (s) => s.tasks.length > 0 && s.tasks.some((t) => t.interrupts.length > 0)
);

重放

重放会重新执行来自先前检查点的步骤。使用先前的 checkpoint_id 调用图,以重新运行该检查点之后的节点。检查点之前的节点将被跳过(它们的结果已经保存)。检查点之后的节点会重新执行,包括任何 LLM 调用、API 请求或 中断——这些在重放期间总是会再次触发。 有关重放过去执行的完整详细信息和代码示例,请参阅 时间旅行 重放

更新状态

您可以使用 graph.updateState() 编辑图状态。这将使用更新后的值创建一个新检查点——它不会修改原始检查点。更新的处理方式与节点更新相同:值在定义时通过 reducer 函数传递,因此带有 reducer 的通道会累加值而不是覆盖它们。 您可以选择指定 asNode 来控制将更新视为来自哪个节点,这会影响下一步执行哪个节点。有关详细信息,请参阅 时间旅行:asNode 更新

内存存储

共享状态模型 状态模式 (State schema) 指定了在图执行时填充的一组键。如上所述,检查点记录器可以在每个图步骤将状态写入线程,从而实现状态持久化。 如果我们想跨线程保留某些信息呢?考虑一个聊天机器人的案例,我们希望在与该用户的所有聊天对话(即线程)中保留关于该用户的特定信息! 仅靠检查点记录器,我们无法跨线程共享信息。这就产生了对 Store 接口的需求。作为演示,我们可以定义一个 InMemoryStore 来跨线程存储有关用户的信息。我们只需像以前一样使用检查点记录器编译图,并传入该存储即可。
LangGraph API 自动处理存储 使用 LangGraph API 时,您无需手动实现或配置存储。API 会在后台为您处理所有存储基础设施。
InMemoryStore 适用于开发和测试。对于生产环境,请使用 PostgresStoreMongoDBStoreRedisStore 等持久化存储。所有实现都扩展了 BaseStore,这是在节点函数签名中使用的类型注释。

基本用法

首先,让我们在不使用 LangGraph 的情况下单独展示这一点。
import { MemoryStore } from "@langchain/langgraph";

const memoryStore = new MemoryStore();
记忆按 tuple 进行命名空间划分,在此特定示例中为 (<user_id>, "memories")。命名空间可以是任何长度,并代表任何内容,不一定非要特定于用户。
const userId = "1";
const namespaceForMemory = [userId, "memories"];
我们使用 store.put 方法将记忆保存到存储中的命名空间。执行此操作时,我们要指定如上定义的命名空间,以及记忆的键值对:键只是该记忆的唯一标识符 (memory_id),值(一个字典)是记忆本身。
import { v4 as uuidv4 } from "uuid";

const memoryId = uuidv4();
const memory = { food_preference: "I like pizza" };
await memoryStore.put(namespaceForMemory, memoryId, memory);
我们可以使用 store.search 方法读取命名空间中的记忆,该方法将以列表形式返回给定用户的所有记忆。最近的记忆位于列表的最后。
const memories = await memoryStore.search(namespaceForMemory);
memories[memories.length - 1];

// {
//   value: { food_preference: 'I like pizza' },
//   key: '07e0caf4-1631-47b7-b15f-65515d4c1843',
//   namespace: ['1', 'memories'],
//   createdAt: '2024-10-02T17:22:31.590602+00:00',
//   updatedAt: '2024-10-02T17:22:31.590605+00:00'
// }
它具有以下属性
  • value:此记忆的值
  • key:此内存在此命名空间中的唯一键
  • namespace:字符串元组,此记忆类型的命名空间
    虽然类型是 tuple,但在转换为 JSON 时可能会序列化为列表(例如 ['1', 'memories'])。
  • createdAt:此记忆创建时的时间戳
  • updatedAt:此记忆更新时的时间戳
除了简单的检索之外,存储还支持语义搜索,允许您根据含义而不是精确匹配来查找记忆。要启用此功能,请为存储配置嵌入模型 (embedding model)
import { OpenAIEmbeddings } from "@langchain/openai";

const store = new InMemoryStore({
  index: {
    embeddings: new OpenAIEmbeddings({ model: "text-embedding-3-small" }),
    dims: 1536,
    fields: ["food_preference", "$"], // Fields to embed
  },
});
现在搜索时,您可以使用自然语言查询来查找相关的记忆
// Find memories about food preferences
// (This can be done after putting memories into the store)
const memories = await store.search(namespaceForMemory, {
  query: "What does the user like to eat?",
  limit: 3, // Return top 3 matches
});
您可以通过配置 fields 参数或在存储记忆时指定 index 参数来控制记忆的哪些部分被嵌入
// Store with specific fields to embed
await store.put(
  namespaceForMemory,
  uuidv4(),
  {
    food_preference: "I love Italian cuisine",
    context: "Discussing dinner plans",
  },
  { index: ["food_preference"] } // Only embed "food_preferences" field
);

// Store without embedding (still retrievable, but not searchable)
await store.put(
  namespaceForMemory,
  uuidv4(),
  { system_info: "Last updated: 2024-01-01" },
  { index: false }
);

在 LangGraph 中使用

有了这些,我们在 LangGraph 中使用 memoryStorememoryStore 与检查点记录器协同工作:检查点记录器如上所述将状态保存到线程,而 memoryStore 允许我们存储任意信息以便线程访问。我们按如下方式同时使用检查点记录器和 memoryStore 编译图。
import { MemorySaver } from "@langchain/langgraph";

// We need this because we want to enable threads (conversations)
const checkpointer = new MemorySaver();

// ... Define the graph ...

// Compile the graph with the checkpointer and store
const graph = workflow.compile({ checkpointer, store: memoryStore });
我们使用 thread_id 调用图,就像以前一样,同时也使用 user_id,我们将使用它来将我们的记忆命名空间划分到如上所示的特定用户。
// Invoke the graph
const userId = "1";
const config = { configurable: { thread_id: "1" }, context: { userId } };

// First let's just say hi to the AI
for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}
您可以通过 runtime 参数在任何节点中访问存储和 userId。以下是您如何使用它来保存记忆的方法
import { StateSchema, MessagesValue, Runtime } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";

const MessagesState = new StateSchema({
  messages: MessagesValue,
});

const updateMemory: GraphNode<typeof MessagesState> = async (state, runtime) => {
  // Get the user id from the config
  const userId = runtime.context?.user_id;
  if (!userId) throw new Error("User ID is required");

  // Namespace the memory
  const namespace = [userId, "memories"];

  // ... Analyze conversation and create a new memory
  const memory = "Some memory content";

  // Create a new memory ID
  const memoryId = uuidv4();

  // We create a new memory
  await runtime.store?.put(namespace, memoryId, { memory });
};
如上所示,我们还可以在任何节点中访问存储并使用 store.search 方法来获取记忆。回想一下,记忆以对象列表的形式返回,可以转换为字典。
memories[memories.length - 1];
// {
//   value: { food_preference: 'I like pizza' },
//   key: '07e0caf4-1631-47b7-b15f-65515d4c1843',
//   namespace: ['1', 'memories'],
//   createdAt: '2024-10-02T17:22:31.590602+00:00',
//   updatedAt: '2024-10-02T17:22:31.590605+00:00'
// }
我们可以访问记忆并在我们的模型调用中使用它们。
const callModel: GraphNode<typeof MessagesState> = async (state, runtime) => {
  // Get the user id from the config
  const userId = runtime.context?.user_id;

  // Namespace the memory
  const namespace = [userId, "memories"];

  // Search based on the most recent message
  const memories = await runtime.store?.search(namespace, {
    query: state.messages[state.messages.length - 1].content,
    limit: 3,
  });
  const info = memories.map((d) => d.value.memory).join("\n");

  // ... Use memories in the model call
};
如果我们创建一个新线程,只要 user_id 相同,我们仍然可以访问相同的记忆。
// Invoke the graph
const config = { configurable: { thread_id: "2" }, context: { userId: "1" } };

// Let's say hi again
for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi, tell me about my memories" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}
当我们使用 LangSmith 时,无论是本地(例如在 Studio 中)还是 托管在 LangSmith 平台,默认情况下都可以使用基础存储,无需在图编译期间指定。但是,要启用语义搜索,您确实需要在 langgraph.json 文件中配置索引设置。例如
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
有关更多详细信息和配置选项,请参阅 部署指南

优化检查点存储

检查点记录器 (Checkpointer) 库

在底层,检查点功能由符合 BaseCheckpointSaver 接口的检查点记录器对象提供支持。LangGraph 提供了几种检查点记录器实现,全部通过独立的、可安装的库实现。
  • @langchain/langgraph-checkpoint:检查点记录器保存器的基础接口 (BaseCheckpointSaver) 和序列化/反序列化接口 (SerializerProtocol)。包含用于实验的内存中检查点记录器实现 (MemorySaver)。LangGraph 已经包含了 @langchain/langgraph-checkpoint
  • @langchain/langgraph-checkpoint-sqlite:使用 SQLite 数据库的 LangGraph 检查点记录器实现 (SqliteSaver)。非常适合实验和本地工作流。需要单独安装。
  • @langchain/langgraph-checkpoint-postgres:一种使用 Postgres 数据库的高级检查点记录器 (PostgresSaver),用于 LangSmith。非常适合在生产环境中使用。需要单独安装。
  • @langchain/langgraph-checkpoint-mongodb:一种由 MongoDB 支持的高级检查点记录器 (MongoDBSaver) 和长期记忆存储 (MongoDBStore)。该存储支持跨线程持久化,并可选集成向量搜索。非常适合生产环境使用。需要单独安装。
  • @langchain/langgraph-checkpoint-redis:一种使用 Redis 数据库的高级检查点记录器 (RedisSaver)。非常适合在生产环境中使用。需要单独安装。

检查点记录器接口

每个检查点记录器都符合 BaseCheckpointSaver 接口,并实现了以下方法
  • .put - 存储包含其配置和元数据的检查点。
  • .putWrites - 存储与检查点关联的中间写入(即 待处理写入)。
  • .getTuple - 使用给定的配置(thread_idcheckpoint_id)获取检查点元组。这用于在 graph.getState() 中填充 StateSnapshot
  • .list - 列出符合给定配置和过滤条件的检查点。这用于在 graph.getStateHistory() 中填充状态历史记录。

© . This site is unofficial and not affiliated with LangChain, Inc.