跳到主要内容
中断允许您在特定点暂停图执行,并在继续之前等待外部输入。这实现了人工参与回路模式,您需要外部输入才能继续。当触发中断时,LangGraph 会使用其 持久性层保存图状态,并无限期地等待,直到您恢复执行。 中断的工作方式是在图的任何节点中调用 interrupt() 函数。该函数接受任何可 JSON 序列化的值,该值会显示给调用者。准备好继续时,您可以通过使用 Command 重新调用图来恢复执行,然后该 Command 将成为节点内部 interrupt() 调用的返回值。 与静态断点(在特定节点之前或之后暂停)不同,中断是动态的——它们可以放置在您的代码中的任何位置,并且可以根据您的应用程序逻辑进行条件设置。
  • 检查点保存您的位置:检查点写入确切的图状态,以便您以后可以恢复,即使在错误状态下也是如此。
  • thread_id 是您的指针:设置 config={"configurable": {"thread_id": ...}},以告诉检查点加载哪个状态。
  • 中断有效负载显示为 __interrupt__您传递给 interrupt() 的值会以 __interrupt__ 字段的形式返回给调用者,以便您知道图正在等待什么。
您选择的 thread_id 实际上是您的持久游标。重复使用它会恢复相同的检查点;使用新值会从一个空状态开始一个全新的线程。

使用 interrupt 暂停

interrupt 函数暂停图执行并向调用者返回值。当您在节点内调用 interrupt 时,LangGraph 会保存当前图状态并等待您使用输入恢复执行。 要使用 interrupt,您需要:
  1. 一个 检查点器 来持久化图状态(在生产环境中,使用持久检查点器)
  2. 配置中的一个 线程 ID,以便运行时知道要恢复哪个状态
  3. 在您想要暂停的位置调用 interrupt()(有效负载必须是 JSON 序列化的)
from langgraph.types import interrupt

def approval_node(state: State):
    # Pause and ask for approval
    approved = interrupt("Do you approve this action?")

    # When you resume, Command(resume=...) returns that value here
    return {"approved": approved}
当您调用 interrupt 时,会发生以下情况
  1. 图执行在 interrupt 被调用的确切点暂停
  2. 状态使用检查点器保存,以便以后可以恢复执行。在生产环境中,这应该是一个持久检查点器(例如,由数据库支持)。
  3. 值返回给调用者,位于 __interrupt__ 下;它可以是任何 JSON 序列化的值(字符串、对象、数组等)。
  4. 图无限期地等待,直到您使用响应恢复执行
  5. 响应传递回节点,当您恢复时,成为 interrupt() 调用的返回值

恢复中断

中断暂停执行后,您可以通过再次调用图并包含恢复值来恢复图。恢复值会传递回 interrupt 调用,允许节点使用外部输入继续执行。
from langgraph.types import Command

# Initial run - hits the interrupt and pauses
# thread_id is the persistent pointer (stores a stable ID in production)
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"input": "data"}, config=config)

# Check what was interrupted
# __interrupt__ contains the payload that was passed to interrupt()
print(result["__interrupt__"])
# > [Interrupt(value='Do you approve this action?')]

# Resume with the human's response
# The resume payload becomes the return value of interrupt() inside the node
graph.invoke(Command(resume=True), config=config)
关于恢复的关键点
  • 在恢复时,必须使用中断发生时使用的相同的线程 ID
  • 传递给 Command(resume=...) 的值将成为 interrupt 调用的返回值
  • 当恢复时,节点将从调用 interrupt 的节点开始重新启动,因此 interrupt 之前的任何代码都会再次运行
  • 您可以将任何 JSON 序列化的值作为恢复值传递

常见模式

中断解锁的关键是能够暂停执行并等待外部输入。这对于各种用例很有用,包括
  • 审批工作流:在执行关键操作(API 调用、数据库更改、财务交易)之前暂停
  • 审查和编辑:让人工审查和修改 LLM 输出或工具调用,然后再继续
  • 中断工具调用:在执行工具调用之前暂停,以审查和编辑工具调用,然后再执行
  • 验证人工输入:在继续下一步之前暂停,以验证人工输入

具有人工参与回路 (HITL) 中断的流式传输

在构建具有人工参与回路工作流的交互式代理时,您可以同时流式传输消息块和节点更新,以便在处理中断时提供实时反馈。 使用 subgraphs=True(如果存在子图)的多种流式传输模式("messages""updates")来:
  • 实时流式传输 AI 响应,在生成时
  • 检测到图遇到中断时
  • 无缝处理用户输入并恢复执行
async for metadata, mode, chunk in graph.astream(
    initial_input,
    stream_mode=["messages", "updates"],
    subgraphs=True,
    config=config
):
    if mode == "messages":
        # Handle streaming message content
        msg, _ = chunk
        if isinstance(msg, AIMessageChunk) and msg.content:
            # Display content in real-time
            display_streaming_content(msg.content)
    
    elif mode == "updates":
        # Check for interrupts
        if "__interrupt__" in chunk:
            # Stop streaming display
            interrupt_info = chunk["__interrupt__"][0].value
            
            # Handle user input
            user_response = get_user_input(interrupt_info)
            
            # Resume graph with updated input
            initial_input = Command(resume=user_response)
            break
        
        else:
            # Track node transitions
            current_node = list(chunk.keys())[0]
  • stream_mode=["messages", "updates"]:启用同时流式传输消息块和图状态更新
  • subgraphs=True:嵌套图中断检测所必需的
  • "__interrupt__" 检测:发出信号,表明需要人工输入
  • Command(resume=...):使用用户提供的数据恢复图执行

批准或拒绝

中断最常见的用途之一是在关键操作之前暂停并请求批准。例如,您可能希望让人工批准 API 调用、数据库更改或任何其他重要决策。
from typing import Literal
from langgraph.types import interrupt, Command

def approval_node(state: State) -> Command[Literal["proceed", "cancel"]]:
    # Pause execution; payload shows up under result["__interrupt__"]
    is_approved = interrupt({
        "question": "Do you want to proceed with this action?",
        "details": state["action_details"]
    })

    # Route based on the response
    if is_approved:
        return Command(goto="proceed")  # Runs after the resume payload is provided
    else:
        return Command(goto="cancel")
恢复图时,传递 true 以批准或 false 以拒绝
# To approve
graph.invoke(Command(resume=True), config=config)

# To reject
graph.invoke(Command(resume=False), config=config)
from typing import Literal, Optional, TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ApprovalState(TypedDict):
    action_details: str
    status: Optional[Literal["pending", "approved", "rejected"]]


def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]:
    # Expose details so the caller can render them in a UI
    decision = interrupt({
        "question": "Approve this action?",
        "details": state["action_details"],
    })

    # Route to the appropriate node after resume
    return Command(goto="proceed" if decision else "cancel")


def proceed_node(state: ApprovalState):
    return {"status": "approved"}


def cancel_node(state: ApprovalState):
    return {"status": "rejected"}


builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("proceed", proceed_node)
builder.add_node("cancel", cancel_node)
builder.add_edge(START, "approval")
builder.add_edge("proceed", END)
builder.add_edge("cancel", END)

# Use a more durable checkpointer in production
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "approval-123"}}
initial = graph.invoke(
    {"action_details": "Transfer $500", "status": "pending"},
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'question': ..., 'details': ...})]

# Resume with the decision; True routes to proceed, False to cancel
resumed = graph.invoke(Command(resume=True), config=config)
print(resumed["status"])  # -> "approved"

审查和编辑状态

有时您希望让人工审查和编辑图状态的一部分,然后再继续。这对于更正 LLM、添加缺失信息或进行调整很有用。
from langgraph.types import interrupt

def review_node(state: State):
    # Pause and show the current content for review (surfaces in result["__interrupt__"])
    edited_content = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"]
    })

    # Update the state with the edited version
    return {"generated_text": edited_content}
恢复时,提供编辑后的内容
graph.invoke(
    Command(resume="The edited and improved text"),  # Value becomes the return from interrupt()
    config=config
)
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ReviewState(TypedDict):
    generated_text: str


def review_node(state: ReviewState):
    # Ask a reviewer to edit the generated content
    updated = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"],
    })
    return {"generated_text": updated}


builder = StateGraph(ReviewState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "review-42"}}
initial = graph.invoke({"generated_text": "Initial draft"}, config=config)
print(initial["__interrupt__"])  # -> [Interrupt(value={'instruction': ..., 'content': ...})]

# Resume with the edited text from the reviewer
final_state = graph.invoke(
    Command(resume="Improved draft after review"),
    config=config,
)
print(final_state["generated_text"])  # -> "Improved draft after review"

工具中的中断

您还可以将中断直接放置在工具函数内部。这样,每当调用该工具时,该工具本身就会暂停以获得批准,并允许在执行工具调用之前进行人工审查和编辑。 首先,定义一个使用 interrupt 的工具:
from langchain.tools import tool
from langgraph.types import interrupt

@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # Pause before sending; payload surfaces in result["__interrupt__"]
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?"
    })

    if response.get("action") == "approve":
        # Resume value can override inputs before executing
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)
        return f"Email sent to {final_to} with subject '{final_subject}'"
    return "Email cancelled by user"
这种方法很有用,当您希望审批逻辑与工具本身一起存在时,使其在图的不同部分可重用。LLM 可以自然地调用该工具,并且中断会在每次调用该工具时暂停执行,允许您批准、编辑或取消该操作。
import sqlite3
from typing import TypedDict

from langchain.tools import tool
from langchain_anthropic import ChatAnthropic
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class AgentState(TypedDict):
    messages: list[dict]


@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # Pause before sending; payload surfaces in result["__interrupt__"]
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?",
    })

    if response.get("action") == "approve":
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)

        # Actually send the email (your implementation here)
        print(f"[send_email] to={final_to} subject={final_subject} body={final_body}")
        return f"Email sent to {final_to}"

    return "Email cancelled by user"


model = ChatAnthropic(model="claude-sonnet-4-5-20250929").bind_tools([send_email])


def agent_node(state: AgentState):
    # LLM may decide to call the tool; interrupt pauses before sending
    result = model.invoke(state["messages"])
    return {"messages": state["messages"] + [result]}


builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)

checkpointer = SqliteSaver(sqlite3.connect("tool-approval.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "email-workflow"}}
initial = graph.invoke(
    {
        "messages": [
            {"role": "user", "content": "Send an email to [email protected] about the meeting"}
        ]
    },
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'action': 'send_email', ...})]

# Resume with approval and optionally edited arguments
resumed = graph.invoke(
    Command(resume={"action": "approve", "subject": "Updated subject"}),
    config=config,
)
print(resumed["messages"][-1])  # -> Tool result returned by send_email

验证人工输入

有时您需要验证来自人类的输入并要求再次输入,如果输入无效。您可以使用多个 interrupt 调用在一个循环中来执行此操作。
from langgraph.types import interrupt

def get_age_node(state: State):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # payload surfaces in result["__interrupt__"]

        # Validate the input
        if isinstance(answer, int) and answer > 0:
            # Valid input - continue
            break
        else:
            # Invalid input - ask again with a more specific prompt
            prompt = f"'{answer}' is not a valid age. Please enter a positive number."

    return {"age": answer}
每次使用无效输入恢复图时,它都会使用更清晰的消息再次询问。一旦提供了有效输入,节点就会完成,并且图将继续。
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class FormState(TypedDict):
    age: int | None


def get_age_node(state: FormState):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # payload surfaces in result["__interrupt__"]

        if isinstance(answer, int) and answer > 0:
            return {"age": answer}

        prompt = f"'{answer}' is not a valid age. Please enter a positive number."


builder = StateGraph(FormState)
builder.add_node("collect_age", get_age_node)
builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", END)

checkpointer = SqliteSaver(sqlite3.connect("forms.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config=config)
print(first["__interrupt__"])  # -> [Interrupt(value='What is your age?', ...)]

# Provide invalid data; the node re-prompts
retry = graph.invoke(Command(resume="thirty"), config=config)
print(retry["__interrupt__"])  # -> [Interrupt(value="'thirty' is not a valid age...", ...)]

# Provide valid data; loop exits and state updates
final = graph.invoke(Command(resume=30), config=config)
print(final["age"])  # -> 30

中断规则

当你在节点内调用 interrupt 时,LangGraph 会通过引发一个信号运行时暂停的异常来暂停执行。此异常会沿着调用堆栈传播,并被运行时捕获,运行时会通知图保存当前状态并等待外部输入。 当执行恢复(在提供请求的输入后),运行时会从头开始重新启动整个节点——它不会从调用 interrupt 的确切行恢复。这意味着在 interrupt 之前运行的任何代码都将再次执行。因此,在使用中断时需要遵循一些重要的规则,以确保它们按预期工作。

不要在 try/except 块中包装 interrupt 调用

interrupt 通过抛出一个特殊异常来暂停执行。如果你将 interrupt 调用包装在一个 try/except 块中,你将捕获此异常,并且中断将不会传递回图。
  • ✅ 将 interrupt 调用与容易出错的代码分开
  • ✅ 在 try/except 块中使用特定的异常类型
def node_a(state: State):
    # ✅ Good: interrupting first, then handling
    # error conditions separately
    interrupt("What's your name?")
    try:
        fetch_data()  # This can fail
    except Exception as e:
        print(e)
    return state
  • 🔴 不要将 interrupt 调用包装在裸 try/except 块中
def node_a(state: State):
    # ❌ Bad: wrapping interrupt in bare try/except
    # will catch the interrupt exception
    try:
        interrupt("What's your name?")
    except Exception as e:
        print(e)
    return state

不要在节点内重新排序 interrupt 调用

在单个节点中使用多个中断很常见,但是如果处理不当,这可能会导致意外行为。 当节点包含多个中断调用时,LangGraph 会维护一个特定于执行该节点的任务的恢复值列表。每当执行恢复时,它都会从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内中断调用的顺序很重要。
  • ✅ 保持 interrupt 调用在节点执行期间的一致性
def node_a(state: State):
    # ✅ Good: interrupt calls happen in the same order every time
    name = interrupt("What's your name?")
    age = interrupt("What's your age?")
    city = interrupt("What's your city?")

    return {
        "name": name,
        "age": age,
        "city": city
    }
  • 🔴 不要有条件地跳过节点内的 interrupt 调用
  • 🔴 不要使用非确定性执行的逻辑来循环 interrupt 调用
def node_a(state: State):
    # ❌ Bad: conditionally skipping interrupts changes the order
    name = interrupt("What's your name?")

    # On first run, this might skip the interrupt
    # On resume, it might not skip it - causing index mismatch
    if state.get("needs_age"):
        age = interrupt("What's your age?")

    city = interrupt("What's your city?")

    return {"name": name, "city": city}

不要在 interrupt 调用中返回复杂的值

根据使用的检查点器,复杂的值可能无法序列化(例如,你无法序列化一个函数)。为了使你的图适应任何部署,最好只使用可以合理序列化的值。
  • ✅ 将简单、JSON 可序列化的类型传递给 interrupt
  • ✅ 传递具有简单值的字典/对象
def node_a(state: State):
    # ✅ Good: passing simple types that are serializable
    name = interrupt("What's your name?")
    count = interrupt(42)
    approved = interrupt(True)

    return {"name": name, "count": count, "approved": approved}
  • 🔴 不要将函数、类实例或其他复杂对象传递给 interrupt
def validate_input(value):
    return len(value) > 0

def node_a(state: State):
    # ❌ Bad: passing a function to interrupt
    # The function cannot be serialized
    response = interrupt({
        "question": "What's your name?",
        "validator": validate_input  # This will fail
    })
    return {"name": response}

interrupt 之前调用的副作用必须是幂等的

由于中断通过重新运行它们被调用的节点来工作,因此在 interrupt 之前调用的副作用(理想情况下)应该是幂等的。对于上下文,幂等性意味着相同的操作可以多次应用,而不会改变初始执行之外的结果。 例如,你可能有一个 API 调用来更新节点内的记录。如果该调用之后调用了 interrupt,当节点恢复时,它将被多次重新运行,可能会覆盖初始更新或创建重复记录。
  • ✅ 在 interrupt 之前使用幂等操作
  • ✅ 将副作用放在 interrupt 调用之后
  • ✅ 尽可能将副作用分离到单独的节点中
def node_a(state: State):
    # ✅ Good: using upsert operation which is idempotent
    # Running this multiple times will have the same result
    db.upsert_user(
        user_id=state["user_id"],
        status="pending_approval"
    )

    approved = interrupt("Approve this change?")

    return {"approved": approved}
  • 🔴 不要执行在 interrupt 之前非幂等操作
  • 🔴 不要创建新记录而不检查它们是否存在
def node_a(state: State):
    # ❌ Bad: creating a new record before interrupt
    # This will create duplicate records on each resume
    audit_id = db.create_audit_log({
        "user_id": state["user_id"],
        "action": "pending_approval",
        "timestamp": datetime.now()
    })

    approved = interrupt("Approve this change?")

    return {"approved": approved, "audit_id": audit_id}

与作为函数调用的子图一起使用

在节点内调用子图时,父图将从调用子图的节点开头恢复执行,并且触发 interrupt。 同样,子图也将从调用 interrupt 的节点开头恢复。
def node_in_parent_graph(state: State):
    some_code()  # <-- This will re-execute when resumed
    # Invoke a subgraph as a function.
    # The subgraph contains an `interrupt` call.
    subgraph_result = subgraph.invoke(some_input)
    # ...

def node_in_subgraph(state: State):
    some_other_code()  # <-- This will also re-execute when resumed
    result = interrupt("What's your name?")
    # ...

使用中断进行调试

为了调试和测试图,你可以使用静态中断作为断点,一次逐步执行一个节点。静态中断在节点执行之前或之后定义的点触发。你可以在编译图时通过指定 interrupt_beforeinterrupt_after 来设置这些中断。
不建议在人工参与的循环中使用静态中断。请使用 interrupt 函数。
graph = builder.compile(
    interrupt_before=["node_a"],  
    interrupt_after=["node_b", "node_c"],  
    checkpointer=checkpointer,
)

# Pass a thread ID to the graph
config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Run the graph until the breakpoint
graph.invoke(inputs, config=config)  

# Resume the graph
graph.invoke(None, config=config)  
  1. 断点在 compile 时设置。
  2. interrupt_before 指定在节点执行之前应暂停的节点。
  3. interrupt_after 指定在节点执行之后应暂停的节点。
  4. 需要检查点器才能启用断点。
  5. 图将运行直到遇到第一个断点。
  6. 通过传递 None 作为输入来恢复图。这将运行图,直到命中下一个断点。

使用 LangSmith Studio

你可以使用 LangSmith Studio 在运行图之前在 UI 中设置静态中断。你还可以使用 UI 在执行的任何点检查图状态。 image
将这些文档连接到 Claude、VSCode 等,以获得实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.