函数式 API 允许您将 LangGraph 的主要功能——持久化、内存、人工干预和流式传输——添加到您的应用程序中,并且对现有代码进行最少的更改。它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流(例如 if 语句、for 循环和函数调用)的现有代码中。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行严格执行模型的情况下整合这些功能。函数式 API 使用两个关键构建块:
entrypoint – 入口点封装了工作流逻辑并管理执行流,包括处理长时间运行的任务和中断。
task – 代表一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 Future 的对象,可以等待或同步解析。
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";const writeEssay = task("writeEssay", async (topic: string) => { // A placeholder for a long-running task. await new Promise((resolve) => setTimeout(resolve, 1000)); return `An essay about topic: ${topic}`;});const workflow = entrypoint( { checkpointer: new MemorySaver(), name: "workflow" }, async (topic: string) => { const essay = await writeEssay(topic); const isApproved = interrupt({ // Any json-serializable payload provided to interrupt as argument. // It will be surfaced on the client side as an Interrupt when streaming data // from the workflow. essay, // The essay we want reviewed. // We can add any additional information that we need. // For example, introduce a key called "action" with some instructions. action: "Please approve/reject the essay", }); return { essay, // The essay that was generated isApproved, // Response from HIL }; });
import { v4 as uuidv4 } from "uuid";import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";const writeEssay = task("writeEssay", async (topic: string) => { // This is a placeholder for a long-running task. await new Promise(resolve => setTimeout(resolve, 1000)); return `An essay about topic: ${topic}`;});const workflow = entrypoint( { checkpointer: new MemorySaver(), name: "workflow" }, async (topic: string) => { const essay = await writeEssay(topic); const isApproved = interrupt({ // Any json-serializable payload provided to interrupt as argument. // It will be surfaced on the client side as an Interrupt when streaming data // from the workflow. essay, // The essay we want reviewed. // We can add any additional information that we need. // For example, introduce a key called "action" with some instructions. action: "Please approve/reject the essay", }); return { essay, // The essay that was generated isApproved, // Response from HIL }; });const threadId = uuidv4();const config = { configurable: { thread_id: threadId }};for await (const item of workflow.stream("cat", config)) { console.log(item);}
import { Command } from "@langchain/langgraph";// Get review from a user (e.g., via a UI)// In this case, we're using a bool, but this can be any json-serializable value.const humanReview = true;for await (const item of workflow.stream(new Command({ resume: humanReview }), config)) { console.log(item);}
import { entrypoint } from "@langchain/langgraph";const myWorkflow = entrypoint( { checkpointer, name: "workflow" }, async (someInput: Record<string, any>): Promise<number> => { // some logic that may involve long-running tasks like API calls, // and may be interrupted for human-in-the-loop return result; });
import { entrypoint, getPreviousState } from "@langchain/langgraph";const myWorkflow = entrypoint( { checkpointer, name: "workflow" }, async (number: number) => { const previous = getPreviousState<number>() ?? 0; // This will return the previous value to the caller, saving // 2 * number to the checkpoint, which will be used in the next invocation // for the `previous` parameter. return entrypoint.final({ value: previous, save: 2 * number, }); });const config = { configurable: { thread_id: "1", },};await myWorkflow.invoke(3, config); // 0 (previous was undefined)await myWorkflow.invoke(1, config); // 6 (previous was 3 * 2 from the previous invocation)
幂等性确保多次运行相同的操作会产生相同的结果。这有助于防止在由于失败而重新运行某个步骤时出现重复的 API 调用和冗余处理。始终将 API 调用放在任务函数中以进行检查点,并将其设计为幂等,以防重新执行。如果任务启动但未成功完成,则可能会发生重新执行。然后,如果工作流恢复,任务将再次运行。使用幂等键或验证现有结果以避免重复。
import { entrypoint, interrupt } from "@langchain/langgraph";import fs from "fs";const myWorkflow = entrypoint( { checkpointer, name: "workflow }, async (inputs: Record<string, any>) => { // This code will be executed a second time when resuming the workflow. // Which is likely not what you want. fs.writeFileSync("output.txt", "Side effect executed"); const value = interrupt("question"); return value; });