from langgraph.types import interruptdef approval_node(state: State): # Pause and ask for approval approved = interrupt("Do you approve this action?") # When you resume, Command(resume=...) returns that value here return {"approved": approved}
from langgraph.types import Command# Initial run - hits the interrupt and pauses# thread_id is the persistent pointer (stores a stable ID in production)config = {"configurable": {"thread_id": "thread-1"}}result = graph.invoke({"input": "data"}, config=config)# Check what was interrupted# __interrupt__ contains the payload that was passed to interrupt()print(result["__interrupt__"])# > [Interrupt(value='Do you approve this action?')]# Resume with the human's response# The resume payload becomes the return value of interrupt() inside the nodegraph.invoke(Command(resume=True), config=config)
中断最常见的用途之一是在关键操作之前暂停并请求批准。例如,您可能希望让人工批准 API 调用、数据库更改或任何其他重要决策。
复制
from typing import Literalfrom langgraph.types import interrupt, Commanddef approval_node(state: State) -> Command[Literal["proceed", "cancel"]]: # Pause execution; payload shows up under result["__interrupt__"] is_approved = interrupt({ "question": "Do you want to proceed with this action?", "details": state["action_details"] }) # Route based on the response if is_approved: return Command(goto="proceed") # Runs after the resume payload is provided else: return Command(goto="cancel")
恢复图时,传递 true 以批准或 false 以拒绝
复制
# To approvegraph.invoke(Command(resume=True), config=config)# To rejectgraph.invoke(Command(resume=False), config=config)
完整示例
复制
from typing import Literal, Optional, TypedDictfrom langgraph.checkpoint.memory import MemorySaverfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Command, interruptclass ApprovalState(TypedDict): action_details: str status: Optional[Literal["pending", "approved", "rejected"]]def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]: # Expose details so the caller can render them in a UI decision = interrupt({ "question": "Approve this action?", "details": state["action_details"], }) # Route to the appropriate node after resume return Command(goto="proceed" if decision else "cancel")def proceed_node(state: ApprovalState): return {"status": "approved"}def cancel_node(state: ApprovalState): return {"status": "rejected"}builder = StateGraph(ApprovalState)builder.add_node("approval", approval_node)builder.add_node("proceed", proceed_node)builder.add_node("cancel", cancel_node)builder.add_edge(START, "approval")builder.add_edge("proceed", END)builder.add_edge("cancel", END)# Use a more durable checkpointer in productioncheckpointer = MemorySaver()graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "approval-123"}}initial = graph.invoke( {"action_details": "Transfer $500", "status": "pending"}, config=config,)print(initial["__interrupt__"]) # -> [Interrupt(value={'question': ..., 'details': ...})]# Resume with the decision; True routes to proceed, False to cancelresumed = graph.invoke(Command(resume=True), config=config)print(resumed["status"]) # -> "approved"
from langgraph.types import interruptdef review_node(state: State): # Pause and show the current content for review (surfaces in result["__interrupt__"]) edited_content = interrupt({ "instruction": "Review and edit this content", "content": state["generated_text"] }) # Update the state with the edited version return {"generated_text": edited_content}
恢复时,提供编辑后的内容
复制
graph.invoke( Command(resume="The edited and improved text"), # Value becomes the return from interrupt() config=config)
完整示例
复制
import sqlite3from typing import TypedDictfrom langgraph.checkpoint.memory import MemorySaverfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Command, interruptclass ReviewState(TypedDict): generated_text: strdef review_node(state: ReviewState): # Ask a reviewer to edit the generated content updated = interrupt({ "instruction": "Review and edit this content", "content": state["generated_text"], }) return {"generated_text": updated}builder = StateGraph(ReviewState)builder.add_node("review", review_node)builder.add_edge(START, "review")builder.add_edge("review", END)checkpointer = MemorySaver()graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "review-42"}}initial = graph.invoke({"generated_text": "Initial draft"}, config=config)print(initial["__interrupt__"]) # -> [Interrupt(value={'instruction': ..., 'content': ...})]# Resume with the edited text from the reviewerfinal_state = graph.invoke( Command(resume="Improved draft after review"), config=config,)print(final_state["generated_text"]) # -> "Improved draft after review"
from langgraph.types import interruptdef get_age_node(state: State): prompt = "What is your age?" while True: answer = interrupt(prompt) # payload surfaces in result["__interrupt__"] # Validate the input if isinstance(answer, int) and answer > 0: # Valid input - continue break else: # Invalid input - ask again with a more specific prompt prompt = f"'{answer}' is not a valid age. Please enter a positive number." return {"age": answer}
def node_a(state: State): # ✅ Good: interrupting first, then handling # error conditions separately interrupt("What's your name?") try: fetch_data() # This can fail except Exception as e: print(e) return state
🔴 不要将 interrupt 调用包装在裸 try/except 块中
复制
def node_a(state: State): # ❌ Bad: wrapping interrupt in bare try/except # will catch the interrupt exception try: interrupt("What's your name?") except Exception as e: print(e) return state
def node_a(state: State): # ✅ Good: interrupt calls happen in the same order every time name = interrupt("What's your name?") age = interrupt("What's your age?") city = interrupt("What's your city?") return { "name": name, "age": age, "city": city }
🔴 不要有条件地跳过节点内的 interrupt 调用
🔴 不要使用非确定性执行的逻辑来循环 interrupt 调用
复制
def node_a(state: State): # ❌ Bad: conditionally skipping interrupts changes the order name = interrupt("What's your name?") # On first run, this might skip the interrupt # On resume, it might not skip it - causing index mismatch if state.get("needs_age"): age = interrupt("What's your age?") city = interrupt("What's your city?") return {"name": name, "city": city}
def node_a(state: State): # ✅ Good: passing simple types that are serializable name = interrupt("What's your name?") count = interrupt(42) approved = interrupt(True) return {"name": name, "count": count, "approved": approved}
🔴 不要将函数、类实例或其他复杂对象传递给 interrupt
复制
def validate_input(value): return len(value) > 0def node_a(state: State): # ❌ Bad: passing a function to interrupt # The function cannot be serialized response = interrupt({ "question": "What's your name?", "validator": validate_input # This will fail }) return {"name": response}
由于中断通过重新运行它们被调用的节点来工作,因此在 interrupt 之前调用的副作用(理想情况下)应该是幂等的。对于上下文,幂等性意味着相同的操作可以多次应用,而不会改变初始执行之外的结果。例如,你可能有一个 API 调用来更新节点内的记录。如果该调用之后调用了 interrupt,当节点恢复时,它将被多次重新运行,可能会覆盖初始更新或创建重复记录。
✅ 在 interrupt 之前使用幂等操作
✅ 将副作用放在 interrupt 调用之后
✅ 尽可能将副作用分离到单独的节点中
复制
def node_a(state: State): # ✅ Good: using upsert operation which is idempotent # Running this multiple times will have the same result db.upsert_user( user_id=state["user_id"], status="pending_approval" ) approved = interrupt("Approve this change?") return {"approved": approved}
🔴 不要执行在 interrupt 之前非幂等操作
🔴 不要创建新记录而不检查它们是否存在
复制
def node_a(state: State): # ❌ Bad: creating a new record before interrupt # This will create duplicate records on each resume audit_id = db.create_audit_log({ "user_id": state["user_id"], "action": "pending_approval", "timestamp": datetime.now() }) approved = interrupt("Approve this change?") return {"approved": approved, "audit_id": audit_id}
def node_in_parent_graph(state: State): some_code() # <-- This will re-execute when resumed # Invoke a subgraph as a function. # The subgraph contains an `interrupt` call. subgraph_result = subgraph.invoke(some_input) # ...def node_in_subgraph(state: State): some_other_code() # <-- This will also re-execute when resumed result = interrupt("What's your name?") # ...
graph = builder.compile( interrupt_before=["node_a"], interrupt_after=["node_b", "node_c"], checkpointer=checkpointer,)# Pass a thread ID to the graphconfig = { "configurable": { "thread_id": "some_thread" }}# Run the graph until the breakpointgraph.invoke(inputs, config=config) # Resume the graphgraph.invoke(None, config=config)
断点在 compile 时设置。
interrupt_before 指定在节点执行之前应暂停的节点。
interrupt_after 指定在节点执行之后应暂停的节点。
需要检查点器才能启用断点。
图将运行直到遇到第一个断点。
通过传递 None 作为输入来恢复图。这将运行图,直到命中下一个断点。
复制
config = { "configurable": { "thread_id": "some_thread" }}# Run the graph until the breakpointgraph.invoke( inputs, interrupt_before=["node_a"], interrupt_after=["node_b", "node_c"], config=config,)# Resume the graphgraph.invoke(None, config=config)