功能性 API 允许您将 LangGraph 的关键功能——持久化、内存、人工干预和流式传输——添加到您的应用程序中,并且对现有代码的修改最小。它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流(如 if 语句、for 循环和函数调用)的现有代码中。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,功能性 API 允许您在不强制执行严格执行模型的情况下整合这些功能。功能性 API 使用两个关键构建块:
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.func import entrypoint, taskfrom langgraph.types import interrupt@taskdef write_essay(topic: str) -> str: """Write an essay about the given topic.""" time.sleep(1) # A placeholder for a long-running task. return f"An essay about topic: {topic}"@entrypoint(checkpointer=InMemorySaver())def workflow(topic: str) -> dict: """A simple workflow that writes an essay and asks for a review.""" essay = write_essay("cat").result() is_approved = interrupt({ # Any json-serializable payload provided to interrupt as argument. # It will be surfaced on the client side as an Interrupt when streaming data # from the workflow. "essay": essay, # The essay we want reviewed. # We can add any additional information that we need. # For example, introduce a key called "action" with some instructions. "action": "Please approve/reject the essay", }) return { "essay": essay, # The essay that was generated "is_approved": is_approved, # Response from HIL }
import timeimport uuidfrom langgraph.func import entrypoint, taskfrom langgraph.types import interruptfrom langgraph.checkpoint.memory import InMemorySaver@taskdef write_essay(topic: str) -> str: """Write an essay about the given topic.""" time.sleep(1) # This is a placeholder for a long-running task. return f"An essay about topic: {topic}"@entrypoint(checkpointer=InMemorySaver())def workflow(topic: str) -> dict: """A simple workflow that writes an essay and asks for a review.""" essay = write_essay("cat").result() is_approved = interrupt( { # Any json-serializable payload provided to interrupt as argument. # It will be surfaced on the client side as an Interrupt when streaming data # from the workflow. "essay": essay, # The essay we want reviewed. # We can add any additional information that we need. # For example, introduce a key called "action" with some instructions. "action": "Please approve/reject the essay", } ) return { "essay": essay, # The essay that was generated "is_approved": is_approved, # Response from HIL }thread_id = str(uuid.uuid4())config = {"configurable": {"thread_id": thread_id}}for item in workflow.stream("cat", config): print(item)# > {'write_essay': 'An essay about topic: cat'}# > {# > '__interrupt__': (# > Interrupt(# > value={# > 'essay': 'An essay about topic: cat',# > 'action': 'Please approve/reject the essay'# > },# > id='b9b2b9d788f482663ced6dc755c9e981'# > ),# > )# > }
一篇论文已经写好,可以审查了。提供审查后,我们可以恢复工作流。
复制
向 AI 提问
from langgraph.types import Command# Get review from a user (e.g., via a UI)# In this case, we're using a bool, but this can be any json-serializable value.human_review = Truefor item in workflow.stream(Command(resume=human_review), config): print(item)
复制
向 AI 提问
{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': False}}
from langgraph.func import entrypoint@entrypoint(checkpointer=checkpointer)def my_workflow(some_input: dict) -> int: # some logic that may involve long-running tasks like API calls, # and may be interrupted for human-in-the-loop. ... return result
from langchain_core.runnables import RunnableConfigfrom langgraph.func import entrypointfrom langgraph.store.base import BaseStorefrom langgraph.store.memory import InMemoryStorein_memory_store = InMemoryStore(...) # An instance of InMemoryStore for long-term memory@entrypoint( checkpointer=checkpointer, # Specify the checkpointer store=in_memory_store # Specify the store)def my_workflow( some_input: dict, # The input (e.g., passed via `invoke`) *, previous: Any = None, # For short-term memory store: BaseStore, # For long-term memory writer: StreamWriter, # For streaming custom data config: RunnableConfig # For accessing the configuration passed to the entrypoint) -> ...:
@entrypoint(checkpointer=checkpointer)def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]: previous = previous or 0 # This will return the previous value to the caller, saving # 2 * number to the checkpoint, which will be used in the next invocation # for the `previous` parameter. return entrypoint.final(value=previous, save=2 * number)config = { "configurable": { "thread_id": "1" }}my_workflow.invoke(3, config) # 0 (previous was None)my_workflow.invoke(1, config) # 6 (previous was 3 * 2 from the previous invocation)
幂等性确保多次运行相同的操作会产生相同的结果。这有助于防止因故障而重新运行某个步骤时出现重复的 API 调用和冗余处理。始终将 API 调用放置在用于检查点的**任务**函数内部,并设计它们以在重新执行时保持幂等性。如果**任务**启动但未成功完成,则可能会发生重新执行。然后,如果工作流恢复,**任务**将再次运行。使用幂等性键或验证现有结果以避免重复。
@entrypoint(checkpointer=checkpointer)def my_workflow(inputs: dict) -> int: # This code will be executed a second time when resuming the workflow. # Which is likely not what you want. with open("output.txt", "w") as f: f.write("Side effect executed") value = interrupt("question") return value