跳到主要内容
了解如何使用 LangSmith API 直接追踪您的 LLM 应用程序。 强烈建议使用我们的 Python 或 TypeScript SDK 将追踪发送到 LangSmith。我们设计这些 SDK 时考虑了批量处理和后台运行等优化,以确保您的应用程序性能不会因发送追踪到 LangSmith 而受到影响。但是,如果您无法使用我们的 SDK,可以使用 LangSmith REST API 发送追踪。如果您在应用程序中同步发送追踪,性能可能会受到影响。本指南将向您展示如何使用 LangSmith REST API 追踪请求。请查看我们的 API 文档,了解完整的端点列表和请求/响应模式。

基本追踪

记录运行的最简单方法是通过 POST 和 PATCH /runs 端点。这些路由需要关于树结构的最小上下文信息。
使用 LangSmith REST API 时,您需要在请求头中以 "x-api-key" 的形式提供您的 API 密钥。如果您的 API 密钥链接到多个工作区,您需要通过 "x-tenant-id" 在请求头中指定正在使用的工作区。在简单示例中,您不需要在请求正文中设置 dotted_ordertrace_id 字段。这些字段将由系统自动生成。虽然这更简单,但它在 LangSmith 中速度较慢且速率限制较低。
以下示例展示了您如何在 Python 中直接利用我们的 API。同样的原则也适用于其他语言。
import openai
import os
import requests
from datetime import datetime, timezone
from uuid import uuid4

# Send your API Key in the request headers
headers = {
    "x-api-key": os.environ["LANGSMITH_API_KEY"],
    "x-tenant-id": os.environ["LANGSMITH_WORKSPACE_ID"]
}

def post_run(run_id, name, run_type, inputs, parent_id=None):
    """Function to post a new run to the API."""
    data = {
        "id": run_id.hex,
        "name": name,
        "run_type": run_type,
        "inputs": inputs,
        "start_time": datetime.utcnow().isoformat(),
        # "session_name": "project-name",  # the name of the project to trace to
        # "session_id": "project-id",  # the ID of the project to trace to. specify one of session_name or session_id
    }
    if parent_id:
        data["parent_run_id"] = parent_id.hex

    requests.post(
        "https://api.smith.langchain.com/runs",  # Update appropriately for self-hosted installations or the EU region
        json=data,
        headers=headers
    )

def patch_run(run_id, outputs):
    """Function to patch a run with outputs."""
    requests.patch(
        f"https://api.smith.langchain.com/runs/{run_id}",
        json={
            "outputs": outputs,
            "end_time": datetime.now(timezone.utc).isoformat(),
        },
        headers=headers,
    )

# This can be a user input to your app
question = "Can you summarize this morning's meetings?"

# This can be retrieved in a retrieval step
context = "During this morning's meeting, we solved all world conflict."

messages = [
    {"role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context."},
    {"role": "user", "content": f"Question: {question}\nContext: {context}"}
]

# Create parent run
parent_run_id = uuid4()
post_run(parent_run_id, "Chat Pipeline", "chain", {"question": question})

# Create child run
child_run_id = uuid4()
post_run(child_run_id, "OpenAI Call", "llm", {"messages": messages}, parent_run_id)

# Generate a completion
client = openai.Client()
chat_completion = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages
)

# End runs
patch_run(child_run_id, chat_completion.dict())
patch_run(parent_run_id, {"answer": chat_completion.choices[0].message.content})
有关更多信息,请参阅 运行(跨度)数据格式 文档。

批量摄取

为了更快地摄取运行和获得更高的速率限制,您可以使用 POST /runs/multipart 链接 端点。下面是一个示例。它需要 orjson(用于快速 json)和 requests_toolbelt 才能运行
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List
import requests
from requests_toolbelt import MultipartEncoder

def create_dotted_order(
    start_time: datetime | None = None,
    run_id: uuid.UUID | None = None
) -> str:
    """Create a dotted order string for run ordering and hierarchy.

    The dotted order is used to establish the sequence and relationships between runs.
    It combines a timestamp with a unique identifier to ensure proper ordering and tracing.
    """
    st = start_time or datetime.now(timezone.utc)
    id_ = run_id or uuid.uuid4()
    return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"

def create_run_base(
    name: str,
    run_type: str,
    inputs: dict,
    start_time: datetime
) -> dict:
    """Create the base structure for a run."""
    run_id = uuid.uuid4()
    return {
        "id": str(run_id),
        "trace_id": str(run_id),
        "name": name,
        "start_time": start_time.isoformat(),
        "inputs": inputs,
        "run_type": run_type,
    }

def construct_run(
    name: str,
    run_type: str,
    inputs: dict,
    parent_dotted_order: str | None = None,
) -> dict:
    """Construct a run dictionary with the given parameters.

    This function creates a run with a unique ID and dotted order, establishing its place
    in the trace hierarchy if it's a child run.
    """
    start_time = datetime.now(timezone.utc)
    run = create_run_base(name, run_type, inputs, start_time)
    current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))

    if parent_dotted_order:
        current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
        run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
        run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]

    run["dotted_order"] = current_dotted_order
    return run

def serialize_run(operation: str, run_data: dict) -> List[tuple]:
    """Serialize a run for the multipart request.

    This function separates the run data into parts for efficient transmission and storage.
    The main run data and optional fields (inputs, outputs, events) are serialized separately.
    """
    run_id = run_data.get("id", str(uuid.uuid4()))

    # Separate optional fields
    inputs = run_data.pop("inputs", None)
    outputs = run_data.pop("outputs", None)
    events = run_data.pop("events", None)

    parts = []

    # Serialize main run data
    run_data_json = json.dumps(run_data).encode("utf-8")
    parts.append(
        (
            f"{operation}.{run_id}",
            (
                None,
                run_data_json,
                "application/json",
                {"Content-Length": str(len(run_data_json))},
            ),
        )
    )

    # Serialize optional fields
    for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
        if value:
            serialized_value = json.dumps(value).encode("utf-8")
            parts.append(
                (
                    f"{operation}.{run_id}.{key}",
                    (
                        None,
                        serialized_value,
                        "application/json",
                        {"Content-Length": str(len(serialized_value))},
                    ),
                )
            )

    return parts

def batch_ingest_runs(
    api_url: str,
    api_key: str,
    posts: list[dict] | None = None,
    patches: list[dict] | None = None,
) -> None:
    """Ingest multiple runs in a single batch request.

    This function handles both creating new runs (posts) and updating existing runs (patches).
    It's more efficient for ingesting multiple runs compared to individual API calls.
    """
    boundary = uuid.uuid4().hex
    all_parts = []

    for operation, runs in zip(("post", "patch"), (posts, patches)):
        if runs:
            all_parts.extend(
                [part for run in runs for part in serialize_run(operation, run)]
            )

    encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
    headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}

    try:
        response = requests.post(
            f"{api_url}/runs/multipart",
            data=encoder,
            headers=headers
        )
        response.raise_for_status()
        print("Successfully ingested runs.")
    except requests.RequestException as e:
        print(f"Error ingesting runs: {e}")
        # In a production environment, you might want to log this error or handle it more robustly

# Configure API URL and key
# For production use, consider using a configuration file or environment variables
api_url = "https://api.smith.langchain.com"
api_key = os.environ.get("LANGSMITH_API_KEY")

if not api_key:
    raise ValueError("LANGSMITH_API_KEY environment variable is not set")

# Create a parent run
parent_run = construct_run(
    name="Parent Run",
    run_type="chain",
    inputs={"main_question": "Tell me about France"},
)

# Create a child run, linked to the parent
child_run = construct_run(
    name="Child Run",
    run_type="llm",
    inputs={"question": "What is the capital of France?"},
    parent_dotted_order=parent_run["dotted_order"],
)

# First, post the runs to create them
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)

# Then, update the runs with their end times and any outputs
child_run_update = {
    **child_run,
    "end_time": datetime.now(timezone.utc).isoformat(),
    "outputs": {"answer": "Paris is the capital of France."},
}

parent_run_update = {
    **parent_run,
    "end_time": datetime.now(timezone.utc).isoformat(),
    "outputs": {"summary": "Discussion about France, including its capital."},
}

patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)

# Note: This example requires the `requests` and `requests_toolbelt` libraries.
# You can install them using pip:
# pip install requests requests_toolbelt

以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.