跳到主要内容

文档索引

在以下地址获取完整的文档索引:https://docs.langchain.org.cn/llms.txt

在进一步探索之前,请使用此文件发现所有可用页面。

Agent2Agent (A2A) 是谷歌提出的一种用于实现对话式 AI 智能体间通信的协议。LangSmith 实现了对 A2A 的支持,允许你的智能体通过标准化协议与其他兼容 A2A 的智能体进行通信。 A2A 端点可在 Agent Server 中的 /a2a/{assistant_id} 使用。

支持的方法

Agent Server 支持以下 A2A RPC 方法
  • message/send:向助手发送消息并接收完整响应
  • message/stream:发送消息并使用服务器发送事件 (SSE) 实时流式传输响应
  • tasks/get:检索先前创建的任务的状态和结果

智能体卡片发现

每个助手都会自动公开一张 A2A 智能体卡片,该卡片描述了其功能并提供了其他智能体连接所需的信息。你可以使用以下方式检索任何助手的智能体卡片
GET /.well-known/agent-card.json?assistant_id={assistant_id}
智能体卡片包括助手的名称、描述、可用技能、支持的输入/输出模式以及用于通信的 A2A 端点 URL。

要求

要使用 A2A,请确保安装了以下依赖项
  • langgraph-api >= 0.4.21
安装命令
pip install "langgraph-api>=0.4.21"

使用概述

启用 A2A
  • 升级至使用 langgraph-api>=0.4.21。
  • 部署具有基于消息的状态结构的智能体。
  • 使用该端点与其他兼容 A2A 的智能体建立连接。

创建 A2A 兼容的智能体

此示例创建了一个兼容 A2A 的智能体,该智能体使用 OpenAI 的 API 处理传入消息并维护对话状态。该智能体定义了基于消息的状态结构,并处理 A2A 协议的消息格式。 为了与 A2A “text” 部分兼容,智能体状态中必须包含 messages 键。 A2A 协议使用两个标识符来保持对话的连续性:
  • contextId:将消息分组到一个对话线程中(类似于会话 ID)
  • taskId:标识该对话中的每个单独请求
在第一条消息中,省略 contextIdtaskId - 智能体会生成并返回它们。对于对话中的所有后续消息,请包含来自之前响应的 contextIdtaskId 以保持线程连续性。 LangSmith 追踪:LangSmith 部署的 A2A 端点会自动将 A2A 的 contextId 转换为 thread_id 以进行 LangSmith 追踪,将对话中的所有消息归类在同一个线程下。 例如:
"""LangGraph A2A conversational agent.

Supports the A2A protocol with messages input for conversational interactions.
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Any, Dict, List, TypedDict

from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
from openai import AsyncOpenAI


class Context(TypedDict):
    """Context parameters for the agent."""
    my_configurable_param: str


@dataclass
class State:
    """Input state for the agent.

    Defines the initial structure for A2A conversational messages.
    """
    messages: List[Dict[str, Any]]


async def call_model(state: State, runtime: Runtime[Context]) -> Dict[str, Any]:
    """Process conversational messages and returns output using OpenAI."""
    # Initialize OpenAI client
    client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    # Process the incoming messages
    latest_message = state.messages[-1] if state.messages else {}
    user_content = latest_message.get("content", "No message content")

    # Create messages for OpenAI API
    openai_messages = [
        {
            "role": "system",
            "content": "You are a helpful conversational agent. Keep responses brief and engaging."
        },
        {
            "role": "user",
            "content": user_content
        }
    ]

    try:
        # Make OpenAI API call
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=openai_messages,
            max_tokens=100,
            temperature=0.7
        )

        ai_response = response.choices[0].message.content

    except Exception as e:
        ai_response = f"I received your message but had trouble processing it. Error: {str(e)[:50]}..."

    # Create a response message
    response_message = {
        "role": "assistant",
        "content": ai_response
    }

    return {
        "messages": state.messages + [response_message]
    }


# Define the graph
graph = (
    StateGraph(State, context_schema=Context)
    .add_node(call_model)
    .add_edge("__start__", "call_model")
    .compile()
)

智能体间通信

一旦你的智能体通过 langgraph dev 在本地运行或部署到生产环境,你就可以使用 A2A 协议促进它们之间的通信。 此示例演示了两个智能体如何通过向彼此的 A2A 端点发送 JSON-RPC 消息进行通信。该脚本模拟了一场多轮对话,其中每个智能体都会处理对方的响应并继续对话。
#!/usr/bin/env python3
"""Agent-to-Agent conversation simulation using the LangGraph A2A endpoint."""

import asyncio
import aiohttp
import os
import uuid


def extract_text(result: dict) -> str:
    """Best-effort extraction of response text from an A2A result."""
    for art in result.get("result", {}).get("artifacts", []) or []:
        for part in art.get("parts", []) or []:
            if part.get("kind") == "text" and part.get("text"):
                return part["text"]

    msg = (result.get("result", {}).get("status", {}) or {}).get("message", {}) or {}
    for part in msg.get("parts", []) or []:
        if part.get("kind") == "text" and part.get("text"):
            return part["text"]

    return "(no text found)"


async def send_message(session, port, assistant_id, text, context_id=None, task_id=None):
    """Send an A2A message. Returns (response_text, returned_context_id, returned_task_id)."""
    url = f"http://127.0.0.1:{port}/a2a/{assistant_id}"

    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }

    # A2A multi-turn continuity: reuse contextId and taskId across turns/agents
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
    }

    headers = {"Accept": "application/json"}
    async with session.post(url, json=payload, headers=headers) as response:
        result = await response.json()

    returned_context_id = result.get("result", {}).get("contextId") or context_id
    returned_task_id = result.get("result", {}).get("id")
    return extract_text(result), returned_context_id, returned_task_id


async def simulate_conversation():
    """Simulate a conversation between two agents."""

    #Assistant IDs
    agent_a_id = os.getenv("AGENT_A_ID")
    agent_b_id = os.getenv("AGENT_B_ID")

    if not agent_a_id or not agent_b_id:
        print("Set AGENT_A_ID and AGENT_B_ID environment variables")
        return

    message = "Hello! Let's have a conversation."
    context_id = None
    task_id = None

    async with aiohttp.ClientSession() as session:
        for i in range(3):
            print(f"--- Round {i + 1} ---")

            message, context_id, task_id = await send_message(
                session, 2024, agent_a_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔵 Agent A: {message}")

            message, context_id, task_id = await send_message(
                session, 2025, agent_b_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔴 Agent B: {message}\n")


if __name__ == "__main__":
    asyncio.run(simulate_conversation())
如需完整的运行示例,请参阅

分布式追踪

当多个智能体通过 A2A 进行通信时,LangSmith 可以将它们所有的追踪 (traces) 分组到一个线程 (thread) 中,从而为你提供整个多智能体对话的统一视图。

contextId 如何映射到 thread_id

Agent Server 的 A2A 端点会自动将 A2A 的 contextId 转换为 thread_id 以用于 LangSmith 追踪。这意味着对话中所有参与智能体的每条消息都会归类在 LangSmith 的同一个线程下,无需你进行任何额外配置。 工作流程如下:
  1. 在第一条消息中,客户端省略 contextId。服务器生成一个并在响应中返回它。
  2. 客户端在所有后续消息中传递 contextId 以保持对话连续性。
  3. Agent Server 将 contextId 映射到 LangSmith 元数据中的 thread_id,因此所有轮次都会出现在同一个线程中。

跨多个智能体进行追踪

当来自不同框架的智能体通过 A2A 进行通信时,你可以通过在所有智能体之间共享相同的 thread_id 来在 LangSmith 中统一它们的追踪。将第一个智能体返回的 contextId 用作所有后续请求的 thread_id 以下代码片段演示了核心概念。有关包含两个智能体的完整可运行实现,请参考 Google ADK + LangChain 示例
import asyncio
import aiohttp
import uuid


async def send_message(session, url, text, context_id=None, task_id=None, thread_id=None):
    """Send an A2A message and return (response_text, context_id, task_id)."""

    # --- 1. Build the message ---
    # On follow-up turns, include contextId and taskId inside the message object
    # so the server associates them with the ongoing conversation.
    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    # --- 2. Set thread_id in metadata ---
    # thread_id goes at the top level of the JSON-RPC payload, not inside params.
    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
        "metadata": {"thread_id": thread_id},
    }

    async with session.post(url, json=payload, headers={"Accept": "application/json"}) as response:
        if response.status != 200:
            raise RuntimeError(f"HTTP {response.status}: {await response.text()}")
        result = await response.json()

    if "error" in result:
        raise RuntimeError(result["error"].get("message", "Unknown error"))

    result_obj = result.get("result", {})
    returned_context_id = result_obj.get("contextId") or context_id
    returned_task_id = result_obj.get("id")
    text_out = next(
        (
            part.get("text", "")
            for art in result_obj.get("artifacts", []) or []
            for part in art.get("parts", []) or []
            if part.get("kind") == "text"
        ),
        "(no text)",
    )
    return text_out, returned_context_id, returned_task_id


async def run_conversation(agent_a_url, agent_b_url):
    # --- 3. Share thread_id across agents ---
    # Generate a shared thread_id upfront. Once the server returns a contextId,
    # use that instead — this keeps the A2A context and LangSmith thread in sync.
    thread_id = str(uuid.uuid4())
    context_id = None
    task_id = None
    message = "Hello! Let's collaborate."

    async with aiohttp.ClientSession() as session:
        for _ in range(3):
            message, context_id, task_id = await send_message(
                session, agent_a_url, message,
                context_id=context_id, task_id=task_id,
                thread_id=context_id or thread_id,
            )

            # Passing the same thread_id to every agent groups all traces in LangSmith
            message, context_id, task_id = await send_message(
                session, agent_b_url, message,
                context_id=context_id, task_id=task_id,
                thread_id=context_id or thread_id,
            )


asyncio.run(run_conversation(
    "https://:2024/a2a/<agent_a_assistant_id>",
    "https://:2025/a2a/<agent_b_assistant_id>",
))
1. 构建消息:在后续轮次中,在 message 对象内包含 contextIdtaskId,以便服务器将其与正在进行的对话关联起来。在第一条消息中省略它们,因为服务器会生成 contextId 并在响应中返回。 2. 在元数据中设置 thread_id:在 JSON-RPC 有效负载的顶层 metadata 字段中传递 thread_id,而不是在 params 中。 3. 在智能体间共享 thread_id:在第一条消息之前生成一个随机的 thread_id。一旦服务器返回 contextId,将其用作所有后续请求的 thread_id,这样可以使 A2A 对话上下文与 LangSmith 线程保持同步。将相同的 thread_id 传递给每个智能体,以便所有追踪都被分组到一个线程中。

在非 LangGraph 智能体中接收 thread_id

上一节介绍了客户端方面的内容——即在发送消息时传播 thread_id。如果你的某个智能体不是基于 LangGraph 构建的,它还需要在接收端提取并附加 thread_id,以便其追踪能落在相同的 LangSmith 线程中。使用 langsmith.integrations.otel.configure() 来设置自动追踪,并从传入的 A2A 请求元数据中提取 thread_id,从而将追踪归组到同一个线程中。
from fastapi import FastAPI, Request
from langsmith.integrations.otel import configure as configure_otel
from opentelemetry import trace
import json

# --- 1. Configure OTel ---
# Set up automatic tracing to LangSmith for your non-LangGraph agent.
configure_otel(project_name="my-a2a-project")
tracer = trace.get_tracer(__name__)

app = FastAPI()

@app.middleware("http")
async def set_thread_id_middleware(request: Request, call_next):
    thread_id = None
    if request.method == "POST":
        body_bytes = await request.body()
        if body_bytes:
            # --- 2. Extract thread_id from incoming A2A metadata ---
            try:
                body = json.loads(body_bytes)
                thread_id = body.get("metadata", {}).get("thread_id")
            except Exception:
                pass
            # Re-inject the body so downstream handlers can still read it
            async def receive():
                return {"type": "http.request", "body": body_bytes}
            request._receive = receive

    # --- 3. Attach thread_id to the trace ---
    # langsmith.metadata.thread_id groups this trace with others in the same thread.
    with tracer.start_as_current_span("agent") as span:
        if thread_id:
            span.set_attribute("langsmith.metadata.thread_id", thread_id)
        return await call_next(request)
在此中间件之后在 app 上注册你的智能体路由。
在你的环境中设置 LANGSMITH_API_KEY 以及(可选的)LANGSMITH_PROJECT 以启用追踪。对话中的所有智能体应使用同一个项目,以便它们的追踪可以一起显示。

在 LangSmith 中查看跟踪

在运行多智能体对话后,打开 LangSmith UI 并导航至 Threads (线程)。所有来自参与智能体的轮次都将显示在一个由共享的 thread_id 标识的单个线程下。

禁用 A2A

要禁用 A2A 端点,请在你的 langgraph.json 配置文件中将 disable_a2a 设置为 true
{
  "$schema": "https://langgra.ph/schema.json",
  "http": {
    "disable_a2a": true
  }
}

© . This site is unofficial and not affiliated with LangChain, Inc.