跳到主要内容
LangGraph 具有内置的持久层,通过检查点实现。当您使用检查点编译图时,检查点会在每个超步骤保存图状态的 `checkpoint`。这些检查点会保存到 `thread`,可以在图执行后访问。由于 `thread` 允许在执行后访问图的状态,因此诸如人机协作、内存、时间旅行和容错等多种强大功能都得以实现。下面,我们将更详细地讨论这些概念。 Checkpoints
LangGraph API 自动处理检查点 使用 LangGraph API 时,您无需手动实现或配置检查点。API 在后台为您处理所有持久化基础设施。

线程

线程是检查点保存的每个检查点唯一的 ID 或线程标识符。它包含一系列 运行 的累积状态。当运行执行时,助手底层图的 状态 将被持久化到线程中。 当使用检查点调用图时,您必须在配置的 `configurable` 部分中指定 `thread_id`。
{
  configurable: {
    thread_id: "1";
  }
}
可以检索线程的当前状态和历史状态。为了持久化状态,必须在执行运行之前创建线程。LangSmith API 提供了几个用于创建和管理线程和线程状态的端点。有关更多详细信息,请参阅 API 参考

检查点

线程在特定时间点的状态称为检查点。检查点是在每个超步骤保存的图状态快照,由具有以下关键属性的 `StateSnapshot` 对象表示
  • config: 与此检查点关联的配置。
  • metadata: 与此检查点关联的元数据。
  • values: 此时刻状态通道的值。
  • next 将在图中执行的节点名称元组。
  • tasks: 包含要执行的后续任务信息的 `PregelTask` 对象元组。如果之前尝试过该步骤,它将包含错误信息。如果图在节点内 动态 中断,则任务将包含与中断相关的额外数据。
检查点是持久化的,可用于在以后恢复线程的状态。 让我们看看当一个简单图按如下方式调用时会保存哪些检查点:
import { StateGraph, START, END, MemoryServer } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  foo: z.string(),
  bar: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [] as string[],
  }),
});

const workflow = new StateGraph(State)
  .addNode("nodeA", (state) => {
    return { foo: "a", bar: ["a"] };
  })
  .addNode("nodeB", (state) => {
    return { foo: "b", bar: ["b"] };
  })
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);
import { StateGraph, START, END, MemoryServer } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  foo: z.string(),
  bar: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [] as string[],
  }),
});

const workflow = new StateGraph(State)
  .addNode("nodeA", (state) => {
    return { foo: "a", bar: ["a"] };
  })
  .addNode("nodeB", (state) => {
    return { foo: "b", bar: ["b"] };
  })
  .addEdge(START, "nodeA")
  .addEdge("nodeA", "nodeB")
  .addEdge("nodeB", END);

const checkpointer = new MemorySaver();
const graph = workflow.compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ foo: "" }, config);
运行图后,我们预计会看到正好 4 个检查点
  • 空检查点,其中 START 作为下一个要执行的节点
  • 检查点,包含用户输入 {'foo': '', 'bar': []}nodeA 作为下一个要执行的节点
  • 检查点,包含 nodeA 的输出 {'foo': 'a', 'bar': ['a']}nodeB 作为下一个要执行的节点
  • 检查点,包含 nodeB 的输出 {'foo': 'b', 'bar': ['a', 'b']},并且没有下一个要执行的节点
请注意,`bar` 通道值包含来自两个节点的输出,因为我们为 `bar` 通道设置了reducer。

获取状态

与保存的图状态交互时,您必须指定一个 线程标识符。您可以通过调用 graph.getState(config) 查看图的最新状态。这将返回一个 StateSnapshot 对象,该对象对应于配置中提供的线程 ID 关联的最新检查点,或者(如果提供了)线程的检查点 ID 关联的检查点。
// get the latest state snapshot
const config = { configurable: { thread_id: "1" } };
await graph.getState(config);

// get a state snapshot for a specific checkpoint_id
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "1ef663ba-28fe-6528-8002-5a559208592c",
  },
};
await graph.getState(config);
在我们的示例中,getState 的输出将如下所示
StateSnapshot {
  values: { foo: 'b', bar: ['a', 'b'] },
  next: [],
  config: {
    configurable: {
      thread_id: '1',
      checkpoint_ns: '',
      checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c'
    }
  },
  metadata: {
    source: 'loop',
    writes: { nodeB: { foo: 'b', bar: ['b'] } },
    step: 2
  },
  createdAt: '2024-08-29T19:19:38.821749+00:00',
  parentConfig: {
    configurable: {
      thread_id: '1',
      checkpoint_ns: '',
      checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
    }
  },
  tasks: []
}

获取状态历史

您可以通过调用 graph.getStateHistory(config) 获取给定线程的完整图执行历史记录。这将返回一个与配置中提供的线程 ID 相关联的 StateSnapshot 对象列表。重要的是,检查点将按时间顺序排列,最新的检查点/StateSnapshot 排在列表的第一位。
const config = { configurable: { thread_id: "1" } };
for await (const state of graph.getStateHistory(config)) {
  console.log(state);
}
在我们的示例中,getStateHistory 的输出将如下所示
[
  StateSnapshot {
    values: { foo: 'b', bar: ['a', 'b'] },
    next: [],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28fe-6528-8002-5a559208592c'
      }
    },
    metadata: {
      source: 'loop',
      writes: { nodeB: { foo: 'b', bar: ['b'] } },
      step: 2
    },
    createdAt: '2024-08-29T19:19:38.821749+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
      }
    },
    tasks: []
  },
  StateSnapshot {
    values: { foo: 'a', bar: ['a'] },
    next: ['nodeB'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f9-6ec4-8001-31981c2c39f8'
      }
    },
    metadata: {
      source: 'loop',
      writes: { nodeA: { foo: 'a', bar: ['a'] } },
      step: 1
    },
    createdAt: '2024-08-29T19:19:38.819946+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a'
      }
    },
    tasks: [
      PregelTask {
        id: '6fb7314f-f114-5413-a1f3-d37dfe98ff44',
        name: 'nodeB',
        error: null,
        interrupts: []
      }
    ]
  },
  StateSnapshot {
    values: { foo: '', bar: [] },
    next: ['node_a'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f4-6b4a-8000-ca575a13d36a'
      }
    },
    metadata: {
      source: 'loop',
      writes: null,
      step: 0
    },
    createdAt: '2024-08-29T19:19:38.817813+00:00',
    parentConfig: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f0-6c66-bfff-6723431e8481'
      }
    },
    tasks: [
      PregelTask {
        id: 'f1b14528-5ee5-579c-949b-23ef9bfbed58',
        name: 'node_a',
        error: null,
        interrupts: []
      }
    ]
  },
  StateSnapshot {
    values: { bar: [] },
    next: ['__start__'],
    config: {
      configurable: {
        thread_id: '1',
        checkpoint_ns: '',
        checkpoint_id: '1ef663ba-28f0-6c66-bfff-6723431e8481'
      }
    },
    metadata: {
      source: 'input',
      writes: { foo: '' },
      step: -1
    },
    createdAt: '2024-08-29T19:19:38.816205+00:00',
    parentConfig: null,
    tasks: [
      PregelTask {
        id: '6d27aa2e-d72b-5504-a36f-8620e54a76dd',
        name: '__start__',
        error: null,
        interrupts: []
      }
    ]
  }
]
State

重播

还可以回放之前的图执行。如果我们使用 `thread_id` 和 `checkpoint_id` 调用图的 `invoke` 方法,那么我们将重播与 `checkpoint_id` 对应的检查点之前已执行的步骤,并且只执行检查点之后的步骤。
  • thread_id 是线程的 ID。
  • checkpoint_id 是指线程内特定检查点的标识符。
您必须在调用图时将其作为配置的 `configurable` 部分传递
const config = {
  configurable: {
    thread_id: "1",
    checkpoint_id: "0c62ca34-ac19-445d-bbb0-5b4984975b2a",
  },
};
await graph.invoke(null, config);
重要的是,LangGraph 知道某个特定步骤是否已执行过。如果已执行,LangGraph 会简单地重播图中该特定步骤,而不会重新执行该步骤,但仅限于提供 `checkpoint_id` 之前的步骤。 `checkpoint_id` 之后的所有步骤都将执行(即,一个新的分支),即使它们以前已执行过。请参阅此 关于时间旅行的如何操作指南,了解有关重播的更多信息 Replay

更新状态

除了从特定 `checkpoints` 重播图之外,我们还可以编辑图状态。我们使用 graph.updateState() 来实现这一点。此方法接受三个不同的参数

配置

配置应包含 `thread_id`,指定要更新的线程。当只传递 `thread_id` 时,我们更新(或分支)当前状态。或者,如果包含 `checkpoint_id` 字段,则分支该选定的检查点。

这些值将用于更新状态。请注意,此更新的处理方式与来自节点的任何更新完全相同。这意味着这些值将被传递给 reducer 函数(如果为图状态中的某些通道定义了这些函数)。这意味着 update_state 不会自动覆盖每个通道的通道值,而仅覆盖没有 reducer 的通道。让我们来看一个示例。 假设您已使用以下模式定义了图的状态(请参阅上面的完整示例):
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const State = z.object({
  foo: z.number(),
  bar: z.array(z.string()).register(registry, {
    reducer: {
      fn: (x, y) => x.concat(y),
    },
    default: () => [] as string[],
  }),
});
现在假设图的当前状态是
{ foo: 1, bar: ["a"] }
如果您按以下方式更新状态
await graph.updateState(config, { foo: 2, bar: ["b"] });
那么图的新状态将是
{ foo: 2, bar: ["a", "b"] }
`foo` 键(通道)被完全更改(因为没有为该通道指定 reducer,所以 `updateState` 会覆盖它)。但是,为 `bar` 键指定了 reducer,因此它将 `"b"` 附加到 `bar` 的状态。

as_node

调用 updateState 时,您可以选择指定最后一项是 asNode。如果提供它,更新将像来自节点 asNode 一样应用。如果未提供 asNode,则它将设置为更新状态的最后一个节点(如果不明确)。这很重要,因为要执行的下一步取决于最后一个提供更新的节点,因此这可用于控制下一个执行的节点。请参阅此 时间旅行操作指南,了解有关分支状态的更多信息 Update

内存存储

Model of shared state 一个 状态模式 指定了一组键,这些键在图执行时被填充。如上所述,状态可以通过检查点在每个图步骤写入线程,从而实现状态持久化。 但是,如果我们要保留某些信息跨线程呢?考虑聊天机器人中,我们希望在与用户进行所有聊天对话(即线程)时保留有关该用户的特定信息的情况! 仅凭检查点,我们无法跨线程共享信息。这促使需要 Store 接口。作为示例,我们可以定义一个 InMemoryStore 来存储跨线程的用户信息。我们像以前一样,简单地使用检查点和我们的新 in_memory_store 变量编译图。
LangGraph API 自动处理存储 使用 LangGraph API 时,您无需手动实现或配置存储。API 在后台为您处理所有存储基础设施。

基本用法

首先,让我们在不使用 LangGraph 的情况下单独展示这一点。
import { MemoryStore } from "@langchain/langgraph";

const memoryStore = new MemoryStore();
内存由 `tuple` 命名空间管理,在此特定示例中将为 `(<user_id>, "memories")`。命名空间可以是任何长度,表示任何内容,不必特定于用户。
const userId = "1";
const namespaceForMemory = [userId, "memories"];
我们使用 store.put 方法将记忆保存到存储中的命名空间。当我们这样做时,我们指定了如上定义的命名空间,以及一个记忆的键值对:键只是记忆的唯一标识符(`memory_id`),值(一个字典)是记忆本身。
import { v4 as uuidv4 } from "uuid";

const memoryId = uuidv4();
const memory = { food_preference: "I like pizza" };
await memoryStore.put(namespaceForMemory, memoryId, memory);
我们可以使用 `store.search` 方法读取命名空间中的记忆,该方法将返回给定用户的所有记忆作为列表。最新记忆在列表的最后。
const memories = await memoryStore.search(namespaceForMemory);
memories[memories.length - 1];

// {
//   value: { food_preference: 'I like pizza' },
//   key: '07e0caf4-1631-47b7-b15f-65515d4c1843',
//   namespace: ['1', 'memories'],
//   createdAt: '2024-10-02T17:22:31.590602+00:00',
//   updatedAt: '2024-10-02T17:22:31.590605+00:00'
// }
它具有以下属性
  • value: 此记忆的值
  • key: 此命名空间中此记忆的唯一键
  • namespace: 字符串列表,此记忆类型的命名空间
  • createdAt: 此记忆创建的时间戳
  • updatedAt: 此记忆更新的时间戳
除了简单的检索之外,存储还支持语义搜索,允许您根据含义而不是精确匹配来查找记忆。要启用此功能,请使用嵌入模型配置存储
import { OpenAIEmbeddings } from "@langchain/openai";

const store = new InMemoryStore({
  index: {
    embeddings: new OpenAIEmbeddings({ model: "text-embedding-3-small" }),
    dims: 1536,
    fields: ["food_preference", "$"], // Fields to embed
  },
});
现在搜索时,您可以使用自然语言查询来查找相关记忆
// Find memories about food preferences
// (This can be done after putting memories into the store)
const memories = await store.search(namespaceForMemory, {
  query: "What does the user like to eat?",
  limit: 3, // Return top 3 matches
});
您可以通过配置 `fields` 参数或在存储记忆时指定 `index` 参数来控制记忆的哪些部分被嵌入
// Store with specific fields to embed
await store.put(
  namespaceForMemory,
  uuidv4(),
  {
    food_preference: "I love Italian cuisine",
    context: "Discussing dinner plans",
  },
  { index: ["food_preference"] } // Only embed "food_preferences" field
);

// Store without embedding (still retrievable, but not searchable)
await store.put(
  namespaceForMemory,
  uuidv4(),
  { system_info: "Last updated: 2024-01-01" },
  { index: false }
);

在 LangGraph 中使用

完成所有这些后,我们在 LangGraph 中使用 `memoryStore`。 `memoryStore` 与检查点器协同工作:检查点器将状态保存到线程中,如上所述,而 `memoryStore` 允许我们存储任意信息以供线程访问。我们按照如下方式,使用检查点器和 `memoryStore` 编译图。
import { MemorySaver } from "@langchain/langgraph";

// We need this because we want to enable threads (conversations)
const checkpointer = new MemorySaver();

// ... Define the graph ...

// Compile the graph with the checkpointer and store
const graph = workflow.compile({ checkpointer, store: memoryStore });
我们像以前一样,使用 `thread_id` 调用图,并且还使用 `user_id`,我们将用它来将我们的记忆命名空间到此特定用户,如上所示。
// Invoke the graph
const userId = "1";
const config = { configurable: { thread_id: "1", user_id: userId } };

// First let's just say hi to the AI
for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}
我们可以通过将 `config` 和 `store` 作为节点参数来访问任何节点中的 `memoryStore` 和 `user_id`。以下是我们在节点中如何使用语义搜索来查找相关记忆的示例
import { MessagesZodMeta, Runtime } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";

const MessagesZodState = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

const updateMemory = async (
  state: z.infer<typeof MessagesZodState>,
  runtime: Runtime<{ user_id: string }>,
) => {
  // Get the user id from the config
  const userId = runtime.context?.user_id;
  if (!userId) throw new Error("User ID is required");

  // Namespace the memory
  const namespace = [userId, "memories"];

  // ... Analyze conversation and create a new memory

  // Create a new memory ID
  const memoryId = uuidv4();

  // We create a new memory
  await runtime.store?.put(namespace, memoryId, { memory });
};
如上所示,我们还可以访问任何节点中的存储并使用 store.search 方法获取记忆。请记住,记忆以可转换为字典的对象列表形式返回。
memories[memories.length - 1];
// {
//   value: { food_preference: 'I like pizza' },
//   key: '07e0caf4-1631-47b7-b15f-65515d4c1843',
//   namespace: ['1', 'memories'],
//   createdAt: '2024-10-02T17:22:31.590602+00:00',
//   updatedAt: '2024-10-02T17:22:31.590605+00:00'
// }
我们可以访问这些记忆并在模型调用中使用它们。
const callModel = async (
  state: z.infer<typeof MessagesZodState>,
  config: LangGraphRunnableConfig,
  store: BaseStore
) => {
  // Get the user id from the config
  const userId = config.configurable?.user_id;

  // Namespace the memory
  const namespace = [userId, "memories"];

  // Search based on the most recent message
  const memories = await store.search(namespace, {
    query: state.messages[state.messages.length - 1].content,
    limit: 3,
  });
  const info = memories.map((d) => d.value.memory).join("\n");

  // ... Use memories in the model call
};
如果我们创建一个新线程,只要 `user_id` 相同,我们仍然可以访问相同的记忆。
// Invoke the graph
const config = { configurable: { thread_id: "2", user_id: "1" } };

// Let's say hi again
for await (const update of await graph.stream(
  { messages: [{ role: "user", content: "hi, tell me about my memories" }] },
  { ...config, streamMode: "updates" }
)) {
  console.log(update);
}
当我们使用 LangSmith 时,无论是在本地(例如,在 Studio 中)还是 在 LangSmith 中托管,基础存储默认可用,无需在图编译期间指定。但是,要启用语义搜索,您确实需要配置 langgraph.json 文件中的索引设置。例如
{
    ...
    "store": {
        "index": {
            "embed": "openai:text-embeddings-3-small",
            "dims": 1536,
            "fields": ["$"]
        }
    }
}
有关更多详细信息和配置选项,请参阅部署指南

检查点库

在底层,检查点功能由符合 BaseCheckpointSaver 接口的检查点对象提供支持。LangGraph 提供了多种检查点实现,所有这些都通过独立的、可安装的库实现
  • @langchain/langgraph-checkpoint:检查点保存器的基本接口(BaseCheckpointSaver)和序列化/反序列化接口(SerializerProtocol)。包括用于实验的内存中检查点实现(MemorySaver)。LangGraph 随附 @langchain/langgraph-checkpoint
  • @langchain/langgraph-checkpoint-sqlite: LangGraph 检查点的实现,使用 SQLite 数据库 (SqliteSaver)。非常适合实验和本地工作流。需要单独安装。
  • @langchain/langgraph-checkpoint-postgres:一个高级检查点,使用 Postgres 数据库 (PostgresSaver),在 LangSmith 中使用。非常适合在生产环境中使用。需要单独安装。

检查点接口

每个检查点都符合 BaseCheckpointSaver 接口并实现以下方法
  • .put - 存储带其配置和元数据的检查点。
  • .putWrites - 存储与检查点链接的中间写入(即 待定写入)。
  • .getTuple - 使用给定配置(`thread_id` 和 `checkpoint_id`)获取检查点元组。这用于在 graph.getState() 中填充 `StateSnapshot`。
  • .list - 列出符合给定配置和过滤条件的检查点。这用于在 graph.getStateHistory() 中填充状态历史记录

序列化器

当检查点保存图状态时,它们需要序列化状态中的通道值。这是通过序列化器对象完成的。 @langchain/langgraph-checkpoint 定义了实现序列化器的协议,并提供了一个默认实现,可以处理各种类型,包括 LangChain 和 LangGraph 原语、日期时间、枚举等。

能力

人工干预

首先,检查点通过允许人工检查、中断和批准图步骤来促进人机协作工作流。这些工作流需要检查点,因为人必须能够随时查看图的状态,并且图必须能够在人对状态进行任何更新后恢复执行。有关示例,请参阅操作指南

内存

其次,检查点允许在交互之间建立“记忆”。在重复的人机交互(如对话)中,任何后续消息都可以发送到该线程,该线程将保留其对先前消息的记忆。有关如何使用检查点添加和管理对话记忆的信息,请参阅添加记忆

时间旅行

第三,检查点允许“时间旅行”,让用户重播先前的图执行,以审查和/或调试特定图步骤。此外,检查点使得可以在任意检查点处分支图状态,以探索替代轨迹。

容错

最后,检查点还提供了容错和错误恢复:如果一个或多个节点在给定超步骤失败,您可以从上一个成功步骤重新启动图。此外,当图节点在给定超步骤中执行失败时,LangGraph 会存储在该超步骤中成功完成的任何其他节点的待处理检查点写入,以便每当我们从该超步骤恢复图执行时,我们都不会重新运行成功的节点。

待定写入

此外,当图节点在给定超步骤中执行失败时,LangGraph 会存储在该超步骤中成功完成的任何其他节点的待处理检查点写入,以便每当我们从该超步骤恢复图执行时,我们都不会重新运行成功的节点。
以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.