跳到主要内容
LangGraph 可以改变您构建代理的思维方式。当您使用 LangGraph 构建代理时,您将首先将其分解为称为节点的离散步骤。然后,您将描述每个节点的不同决策和转换。最后,您将通过每个节点都可以读取和写入的共享状态将您的节点连接起来。在本教程中,我们将引导您完成使用 LangGraph 构建客户支持电子邮件代理的思考过程。

从您想要自动化的流程开始

想象一下,您需要构建一个处理客户支持电子邮件的 AI 代理。您的产品团队为您提供了以下要求: 该代理应:
  • 阅读传入的客户电子邮件
  • 按紧急程度和主题进行分类
  • 搜索相关文档以回答问题
  • 起草适当的回复
  • 将复杂问题上报给人工代理
  • 必要时安排后续工作
要处理的示例场景
  1. 简单产品问题:“我如何重置密码?”
  2. Bug 报告:“当我选择 PDF 格式时,导出功能崩溃”
  3. 紧急账单问题:“我的订阅被重复扣款了!”
  4. 功能请求:“您能为移动应用程序添加深色模式吗?”
  5. 复杂技术问题:“我们的 API 集成间歇性地出现 504 错误”
要在 LangGraph 中实现代理,您通常会遵循相同的五个步骤。

步骤 1:将您的工作流程映射为离散的步骤

首先确定流程中的不同步骤。每个步骤都将成为一个节点(一个执行特定功能的函数)。然后勾勒出这些步骤如何相互连接。 箭头表示可能的路径,但实际选择哪条路径的决定发生在每个节点内部。 现在您已经确定了工作流程中的组件,让我们了解每个节点需要做什么:
  • 读取电子邮件:提取和解析电子邮件内容
  • 分类意图:使用 LLM 对紧急程度和主题进行分类,然后路由到适当的操作
  • 文档搜索:查询您的知识库以获取相关信息
  • Bug 追踪:在追踪系统中创建或更新问题
  • 起草回复:生成适当的回复
  • 人工审查:上报给人工代理以进行批准或处理
  • 发送回复:发送电子邮件回复
请注意,有些节点会决定下一步去哪里(分类意图、起草回复、人工审查),而另一些节点总是会进入相同的下一步(读取电子邮件总是进入分类意图,文档搜索总是进入起草回复)。

步骤 2:确定每个步骤需要做什么

对于图中的每个节点,确定它代表的操作类型以及它正常工作所需的上下文。

LLM 步骤

当一个步骤需要理解、分析、生成文本或做出推理决策时
  • 静态上下文(提示):分类类别、紧急程度定义、响应格式
  • 动态上下文(来自状态):电子邮件内容、发件人信息
  • 预期结果:确定路由的结构化分类
  • 静态上下文(提示):语气指南、公司政策、回复模板
  • 动态上下文(来自状态):分类结果、搜索结果、客户历史记录
  • 预期结果:可供审查的专业电子邮件回复

数据步骤

当一个步骤需要从外部来源检索信息时
  • 参数:根据意图和主题构建的查询
  • 重试策略:是,对于瞬时故障采用指数退避
  • 缓存:可以缓存常见查询以减少 API 调用
  • 参数:来自状态的客户电子邮件或 ID
  • 重试策略:是,但如果不可用则回退到基本信息
  • 缓存:是,具有生存时间以平衡新鲜度和性能

行动步骤

当一个步骤需要执行外部操作时
  • 何时执行:经批准后(人工或自动化)
  • 重试策略:是,对于网络问题采用指数退避
  • 不应缓存:每次发送都是一个独特的动作
  • 何时执行:当意图是“bug”时始终执行
  • 重试策略:是,关键是不要丢失 bug 报告
  • 返回:要包含在回复中的工单 ID

用户输入步骤

当一个步骤需要人工干预时
  • 决策上下文:原始电子邮件、回复草稿、紧急程度、分类
  • 预期输入格式:批准布尔值和可选的编辑回复
  • 何时触发:紧急程度高、复杂问题或质量问题

步骤 3:设计您的状态

状态是代理中所有节点都可以访问的共享内存。将其视为您的代理用来跟踪其在整个过程中学习和决定的一切的笔记本。

什么属于状态?

向自己提出有关每条数据的问题

包含在状态中

它需要跨步骤持久化吗?如果是,则将其存储在状态中。

不存储

您可以从其他数据中推导出来吗?如果是,则在需要时计算它,而不是将其存储在状态中。
对于我们的电子邮件代理,我们需要跟踪
  • 原始电子邮件和发件人信息(无法重建)
  • 分类结果(多个下游节点需要)
  • 搜索结果和客户数据(重新获取成本高)
  • 回复草稿(需要通过审查持久化)
  • 执行元数据(用于调试和恢复)

保持状态原始,按需格式化提示

一个关键原则:您的状态应该存储原始数据,而不是格式化文本。在需要时在节点内部格式化提示。
这种分离意味着
  • 不同的节点可以根据自己的需求以不同的方式格式化相同的数据
  • 您可以更改提示模板而无需修改状态模式
  • 调试更清晰——您可以看到每个节点收到了哪些数据
  • 您的代理可以在不破坏现有状态的情况下发展
让我们定义我们的状态
from typing import TypedDict, Literal

# Define the structure for email classification
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

class EmailAgentState(TypedDict):
    # Raw email data
    email_content: str
    sender_email: str
    email_id: str

    # Classification result
    classification: EmailClassification | None

    # Raw search/API results
    search_results: list[str] | None  # List of raw document chunks
    customer_history: dict | None  # Raw customer data from CRM

    # Generated content
    draft_response: str | None
    messages: list[str] | None
请注意,状态仅包含原始数据——没有提示模板、没有格式化字符串、没有指令。分类输出直接从 LLM 存储为单个字典。

步骤 4:构建您的节点

现在我们将每个步骤实现为一个函数。LangGraph 中的节点只是一个 Python 函数,它接受当前状态并返回对其的更新。

适当地处理错误

不同的错误需要不同的处理策略
错误类型谁来修复策略何时使用
瞬态错误(网络问题、速率限制)系统(自动)重试策略通常在重试后解决的临时故障
LLM 可恢复错误(工具故障、解析问题)LLM将错误存储在状态中并循环返回LLM 可以看到错误并调整其方法
用户可修复错误(信息缺失、指令不明确)人工使用 interrupt() 暂停需要用户输入才能继续
意外错误开发者让它们冒出来需要调试的未知问题
  • 瞬态错误
  • LLM 可恢复
  • 用户可修复
  • 意外
添加重试策略以自动重试网络问题和速率限制
from langgraph.types import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

实现我们的电子邮件代理节点

我们将每个节点实现为一个简单函数。请记住:节点接受状态,执行工作,并返回更新。
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(model="gpt-4")

def read_email(state: EmailAgentState) -> dict:
    """Extract and parse email content"""
    # In production, this would connect to your email service
    return {
        "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")]
    }

def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
    """Use LLM to classify email intent and urgency, then route accordingly"""

    # Create structured LLM that returns EmailClassification dict
    structured_llm = llm.with_structured_output(EmailClassification)

    # Format the prompt on-demand, not stored in state
    classification_prompt = f"""
    Analyze this customer email and classify it:

    Email: {state['email_content']}
    From: {state['sender_email']}

    Provide classification including intent, urgency, topic, and summary.
    """

    # Get structured response directly as dict
    classification = structured_llm.invoke(classification_prompt)

    # Determine next node based on classification
    if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
        goto = "human_review"
    elif classification['intent'] in ['question', 'feature']:
        goto = "search_documentation"
    elif classification['intent'] == 'bug':
        goto = "bug_tracking"
    else:
        goto = "draft_response"

    # Store classification as a single dict in state
    return Command(
        update={"classification": classification},
        goto=goto
    )
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Search knowledge base for relevant information"""

    # Build search query from classification
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # Implement your search logic here
        # Store raw search results, not formatted text
        search_results = [
            "Reset password via Settings > Security > Change Password",
            "Password must be at least 12 characters",
            "Include uppercase, lowercase, numbers, and symbols"
        ]
    except SearchAPIError as e:
        # For recoverable search errors, store error and continue
        search_results = [f"Search temporarily unavailable: {str(e)}"]

    return Command(
        update={"search_results": search_results},  # Store raw results or error
        goto="draft_response"
    )

def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Create or update bug tracking ticket"""

    # Create ticket in your bug tracking system
    ticket_id = "BUG-12345"  # Would be created via API

    return Command(
        update={
            "search_results": [f"Bug ticket {ticket_id} created"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    """Generate response using context and route based on quality"""

    classification = state.get('classification', {})

    # Format context from raw state data on-demand
    context_sections = []

    if state.get('search_results'):
        # Format search results for the prompt
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"Relevant documentation:\n{formatted_docs}")

    if state.get('customer_history'):
        # Format customer data for the prompt
        context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}")

    # Build the prompt with formatted context
    draft_prompt = f"""
    Draft a response to this customer email:
    {state['email_content']}

    Email intent: {classification.get('intent', 'unknown')}
    Urgency level: {classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    Guidelines:
    - Be professional and helpful
    - Address their specific concern
    - Use the provided documentation when relevant
    """

    response = llm.invoke(draft_prompt)

    # Determine if human review needed based on urgency and intent
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )

    # Route to appropriate next node
    goto = "human_review" if needs_review else "send_reply"

    return Command(
        update={"draft_response": response.content},  # Store only the raw response
        goto=goto
    )

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    """Pause for human review using interrupt and route based on decision"""

    classification = state.get('classification', {})

    # interrupt() must come first - any code before it will re-run on resume
    human_decision = interrupt({
        "email_id": state.get('email_id',''),
        "original_email": state.get('email_content',''),
        "draft_response": state.get('draft_response',''),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "Please review and approve/edit this response"
    })

    # Now process the human's decision
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))},
            goto="send_reply"
        )
    else:
        # Rejection means human will handle directly
        return Command(update={}, goto=END)

def send_reply(state: EmailAgentState) -> dict:
    """Send the email response"""
    # Integrate with email service
    print(f"Sending reply: {state['draft_response'][:100]}...")
    return {}

步骤 5:将其连接起来

现在我们将节点连接成一个可工作的图。由于我们的节点处理自己的路由决策,我们只需要几个必要的边缘。 为了通过 interrupt() 实现人工干预,我们需要使用检查点进行编译,以便在运行之间保存状态:

图编译代码

from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import RetryPolicy

# Create the graph
workflow = StateGraph(EmailAgentState)

# Add nodes with appropriate error handling
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)

# Add retry policy for nodes that might have transient failures
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# Add only the essential edges
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# Compile with checkpointer for persistence, in case run graph with Local_Server --> Please compile without checkpointer
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
图结构是最小的,因为路由是通过 Command 对象在节点内部发生的。每个节点都使用类型提示(如 Command[Literal["node1", "node2"]])声明它可以去哪里,使流程明确且可追溯。

尝试您的代理

让我们运行我们的代理,处理一个需要人工审查的紧急账单问题
# Test with an urgent billing issue
initial_state = {
    "email_content": "I was charged twice for my subscription! This is urgent!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# Run with a thread_id for persistence
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# The graph will pause at human_review
print(f"Draft ready for review: {result['draft_response'][:100]}...")

# When ready, provide human input to resume
from langgraph.types import Command

human_response = Command(
    resume={
        "approved": True,
        "edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund..."
    }
)

# Resume execution
final_result = app.invoke(human_response, config)
print(f"Email sent successfully!")
图在达到 interrupt() 时暂停,将所有内容保存到检查点,然后等待。它可以在几天后恢复,从上次中断的地方精确地继续。thread_id 确保此对话的所有状态都一起保留。

总结和后续步骤

主要见解

构建此电子邮件代理向我们展示了 LangGraph 的思维方式

高级考量

本节探讨了节点粒度设计的权衡。大多数应用程序可以跳过此部分,并使用上面显示的模式。
您可能会问:为什么不将 Read EmailClassify Intent 合并到一个节点中?或者为什么将文档搜索与草稿回复分开?答案涉及韧性和可观测性之间的权衡。韧性考量: LangGraph 的持久执行在节点边界创建检查点。当工作流在中断或失败后恢复时,它会从执行停止的节点的开头开始。更小的节点意味着更频繁的检查点,这意味着如果出现问题,重复的工作量会更少。如果您将多个操作组合成一个大节点,则在接近尾声时发生的故障意味着要从该节点的开头重新执行所有操作。我们为电子邮件代理选择这种分解的原因:
  • 外部服务的隔离:文档搜索和 Bug 跟踪是独立的节点,因为它们调用外部 API。如果搜索服务很慢或失败,我们希望将其与 LLM 调用隔离开来。我们可以将重试策略添加到这些特定节点,而不会影响其他节点。
  • 中间可见性:Classify Intent 作为自己的节点,可以让我们在采取行动之前检查 LLM 的决策。这对于调试和监控很有价值——您可以准确地看到代理何时以及为什么路由到人工审查。
  • 不同的故障模式:LLM 调用、数据库查找和电子邮件发送具有不同的重试策略。独立的节点允许您独立配置这些策略。
  • 可重用性和测试:更小的节点更容易单独测试,并在其他工作流中重用。
另一种有效的方法:您可以将 Read EmailClassify Intent 合并为一个节点。您将失去在分类之前检查原始电子邮件的能力,并且在该节点中发生任何故障时将重复这两项操作。对于大多数应用程序,独立节点的可观测性和调试优势是值得权衡的。应用程序级别的关注:步骤 2 中关于缓存的讨论(是否缓存搜索结果)是应用程序级别的决策,而不是 LangGraph 框架功能。您根据您的特定要求在节点函数中实现缓存——LangGraph 不会对此进行规定。性能考量:更多的节点并不意味着执行速度更慢。LangGraph 默认在后台写入检查点(异步持久性模式),因此您的图会继续运行,而无需等待检查点完成。这意味着您将获得频繁的检查点,且对性能影响最小。如果需要,您可以调整此行为——使用 "exit" 模式仅在完成时检查点,或使用 "sync" 模式阻止执行直到每个检查点写入完成。

接下来的方向

这是关于如何使用 LangGraph 构建代理的思维介绍。您可以通过以下方式扩展此基础:
以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.