跳到主要内容

文档索引

在以下地址获取完整的文档索引:https://docs.langchain.org.cn/llms.txt

在进一步探索之前,请使用此文件发现所有可用页面。

Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用的执行。 编译 StateGraph 或创建 @entrypoint 会生成一个可接收输入的 Pregel 实例。 本指南将从高层次介绍该运行时,并提供直接使用 Pregel 实现应用的说明。
注意:Pregel 运行时以 Google 的 Pregel 算法命名,该算法描述了一种利用图进行大规模并行计算的高效方法。

概览

在 LangGraph 中,Pregel 将 参与者 (actors)通道 (channels) 整合到一个应用中。参与者从通道读取数据并向通道写入数据。Pregel 遵循 Pregel 算法/批量同步并行 (Bulk Synchronous Parallel) 模型,将应用执行组织成多个步骤。 每个步骤包含三个阶段:
  • 计划 (Plan):确定在此步骤中执行哪些参与者。例如,在第一步中,选择订阅特殊输入通道的参与者;在后续步骤中,选择订阅在前一步中被更新的通道的参与者。
  • 执行 (Execution):并行执行所有选定的参与者,直到全部完成、其中一个失败或达到超时限制。在此阶段,通道更新对参与者不可见,直到下一步。
  • 更新 (Update):根据此步骤中参与者写入的值更新通道。
重复上述过程,直到没有可执行的参与者,或达到最大步数。

参与者 (Actors)

参与者 (Actor)PregelNode。它订阅通道,从中读取数据并写入数据。可以将其视为 Pregel 算法中的一个参与者PregelNodes 实现了 LangChain 的 Runnable 接口。

渠道

通道用于参与者(PregelNodes)之间的通信。每个通道都有一个值类型、更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。通道可用于将数据从一个链传输到另一个链,或在后续步骤中将数据从链传回自身。

LastValue

LastValue 是默认的通道类型。它存储写入其中的最后一个值,覆盖任何之前的值。适用于输入和输出值,或用于将数据从一步传递到下一步。
from langgraph.channels import LastValue

channel: LastValue[int] = LastValue(int)

Topic

Topic 是一种可配置的 PubSub 通道,适用于在参与者之间发送多个值或跨步骤累加输出。它可以配置为对值进行去重或累加运行期间写入的所有值。
from langgraph.channels import Topic

# Accumulate all values written across steps
channel: Topic[str] = Topic(str, accumulate=True)

BinaryOperatorAggregate

BinaryOperatorAggregate 存储一个持久值,通过对当前值和每个新更新应用二进制运算符来进行更新。适用于计算跨步骤的运行聚合。
import operator
from langgraph.channels import BinaryOperatorAggregate

# Running total: each write adds to the current value
total = BinaryOperatorAggregate(int, operator.add)

DeltaChannel (测试版)

DeltaChannel 需要 langgraph>=1.2 且目前处于测试阶段。API 在未来版本中可能会有变动。
DeltaChannel 在每一步仅存储增量,而不是完整的累加值。这对于频繁写入且随时间积累大量值的通道最为有用——例如长期运行线程中的对话消息列表。若不使用增量存储,完整的列表会在每个检查点被重新序列化;使用 DeltaChannel,则仅存储每一步写入的新消息。
当通道写入频繁且随时间增长较大时,请考虑使用 DeltaChannel。一个好的判断信号:如果您注意到特定通道的检查点大小随线程长度线性增长,那么 DeltaChannel 很可能是一个合适的选择。
Annotated 类型注解中使用 DeltaChannel 的方式与使用普通归约器相同。
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.channels import DeltaChannel


def my_reducer(state: list[str], writes: Sequence[list[str]]) -> list[str]:
    result = list(state)
    for write in writes:
        result.extend(write)
    return result


class State(TypedDict):
    messages: Annotated[list[str], DeltaChannel(my_reducer)]

批量归约器 (Bulk reducer) 要求

传递给 DeltaChannelreducer 是一个批量归约器 (bulk reducer):它在单次调用中接收当前状态和当前步骤中所有写入的序列,而不是像标准归约器那样按对接收。这与 StateGraphAnnotated 使用的按键归约器不同,后者每个更新调用一次。
批量归约器必须满足结合律 (associative)(批处理不变性)
reducer(reducer(state, [xs]), [ys]) == reducer(state, [xs, ys])
如果您的归约器不满足结合律,重建后的状态可能会因 LangGraph 跨步骤分批写入的方式不同而产生差异,从而导致不一致的行为。
以下是两种最常见情况下的批量归约器
from typing import Any, Sequence


# List: append all writes in order
def list_reducer(state: list[Any], writes: Sequence[list[Any]]) -> list[Any]:
    result = list(state)
    for write in writes:
        result.extend(write)
    return result


# Dict: merge all writes, last write wins on key conflicts
def dict_reducer(
    state: dict[str, Any], writes: Sequence[dict[str, Any]]
) -> dict[str, Any]:
    result = dict(state)
    for write in writes:
        result.update(write)
    return result
两者都满足结合律:逐个应用批次与一起应用批次产生的结果相同。

使用 snapshot_frequency 实现有界读取延迟

如果没有快照,读取 DeltaChannel 的值需要重放完整的写入历史——对于一个有 N 步的线程,复杂度为 O(N)。设置 snapshot_frequency=K 会每 K 个 Pregel 步骤写入一个完整快照,将读取深度限制在最多 K 个步骤。
class State(TypedDict):
    messages: Annotated[
        list[str],
        DeltaChannel(my_reducer, snapshot_frequency=5),
    ]
较高的 snapshot_frequency 值会降低存储开销但增加读取延迟。较低的值能更严格地限制延迟,但代价是检查点变大。None(默认值)完全跳过快照——适用于读取频率较低或线程较短的情况。

示例

虽然大多数用户会通过 StateGraph API 或 @entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 进行交互。 以下是一些示例,以帮助您了解 Pregel API。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}

高级 API

LangGraph 提供了两个用于创建 Pregel 应用的高级 API:StateGraph (Graph API)函数式 API (Functional API)
StateGraph (Graph API) 是一种更高级的抽象,简化了 Pregel 应用的创建。它允许您定义节点和边的图。当您编译图时,StateGraph API 会自动为您创建 Pregel 应用。
from typing import TypedDict

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: str | None
    score: float | None

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
builder.add_edge("write_essay", "score_essay")

# Compile the graph.
# This will return a Pregel instance.
graph = builder.compile()
编译后的 Pregel 实例将与一组节点和通道相关联。您可以通过打印它们来查看这些节点和通道。
print(graph.nodes)
您将看到类似这样的内容
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)
您应该会看到类似这样的内容
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}

© . This site is unofficial and not affiliated with LangChain, Inc.