跳到主要内容
推荐阅读在深入了解此内容之前,阅读以下内容可能会有所帮助:
如果您正在导出大量追踪,我们建议您使用批量数据导出功能,因为它能更好地处理大量数据,并支持跨分区的自动重试和并行化。
查询运行(LangSmith 追踪中的 span 数据)的推荐方法是使用 SDK 中的 list_runs 方法或 API 中的 /runs/query 端点。 LangSmith 以运行(span)数据格式中指定的简单格式存储追踪。

使用过滤参数

对于简单查询,您无需依赖我们的查询语法。您可以使用过滤参数参考中指定的过滤参数。
先决条件在运行以下代码片段之前,请初始化客户端。
from langsmith import Client

client = Client()
以下是使用关键字参数列出运行的一些示例

列出项目中的所有运行

project_runs = client.list_runs(project_name="<your_project>")

列出过去 24 小时内的 LLM 和 Chat 运行

todays_llm_runs = client.list_runs(
    project_name="<your_project>",
    start_time=datetime.now() - timedelta(days=1),
    run_type="llm",
)

列出项目中的根运行

根运行是没有父级的运行。这些运行的 is_root 值为 True。您可以使用此参数来筛选根运行。
root_runs = client.list_runs(
    project_name="<your_project>",
    is_root=True
)

列出没有错误的运行

correct_runs = client.list_runs(project_name="<your_project>", error=False)

按运行 ID 列出运行

忽略其他参数如果您按上述方式提供运行 ID 列表,它将忽略所有其他过滤参数,如 project_namerun_type 等,并直接返回与给定 ID 匹配的运行。
如果您有一个运行 ID 列表,可以直接列出它们
run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

使用筛选查询语言

对于更复杂的查询,您可以使用筛选查询语言参考中描述的查询语言。

列出对话线程中的所有根运行

这是在对话线程中获取运行的方法。有关设置线程的更多信息,请参阅我们的线程设置操作指南。线程通过设置共享的线程 ID 进行分组。LangSmith UI 允许您使用以下三个元数据键中的任何一个:session_idconversation_idthread_id。会话 ID 也称为追踪项目 ID。以下查询匹配其中任何一个。
group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
    project_name="<your_project>",
    filter=filter_string,
    is_root=True
)

列出所有名为“extractor”的运行,其追踪根的反馈“user_score”评分为 1

client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "extractor")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

列出“star_rating”键的值大于 4 的运行

client.list_runs(
    project_name="<your_project>",
    filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

列出完成时间超过 5 秒的运行

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

列出所有“error”不为空的运行

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

列出 start_time 大于特定时间戳的所有运行

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

列出所有包含字符串“substring”的运行

client.list_runs(project_name="<your_project>", filter='search("substring")')

列出所有使用 git hash “2aa1cf4” 标记的运行

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

列出所有在特定时间戳之后开始且“error”不为空或“Correctness”反馈评分为 0 的运行

client.list_runs(
  project_name="<your_project>",
  filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

复杂查询:列出所有标签包含“experimental”或“beta”且延迟大于 2 秒的运行

client.list_runs(
  project_name="<your_project>",
  filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

按全文搜索追踪树

您可以使用不带任何特定字段的 search() 函数对运行中的所有字符串字段进行全文搜索。这使您能够快速找到与搜索词匹配的追踪。
client.list_runs(
  project_name="<your_project>",
  filter='search("image classification")'
)

检查元数据是否存在

如果要检查元数据是否存在,可以使用 eq 运算符,可以选择使用 and 语句按值匹配。这对于记录有关运行的更多结构化信息很有用。
to_search = {
    "user_id": ""
}

# Check for any run with the "user_id" metadata key
client.list_runs(
  project_name="default",
  filter="eq(metadata_key, 'user_id')"
)
# Check for runs with user_id=4070f233-f61e-44eb-bff1-da3c163895a3
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

检查元数据中的环境详细信息

一种常见的模式是通过元数据将环境信息添加到您的追踪中。如果您想筛选包含环境元数据的运行,可以使用与上述相同的模式
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

检查元数据中的会话 ID

另一种将同一对话中的追踪关联起来的常见方法是使用共享的对话 ID。如果您想通过这种方式基于对话 ID 筛选运行,可以在元数据中搜索该 ID。
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

键值对的否定过滤

您可以对元数据、输入和输出键值对使用否定过滤,以从结果中排除特定运行。以下是一些元数据键值对的示例,但相同的逻辑也适用于输入和输出键值对。
# Find all runs where the metadata does not contain a "conversation_id" key
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'))"
)

# Find all runs where the conversation_id in metadata is not "a1b2c3d4-e5f6-7890"
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where there is no "conversation_id" metadata key and the "a1b2c3d4-e5f6-7890" value is not present
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where the conversation_id metadata key is not present but the "a1b2c3d4-e5f6-7890" value is present
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

组合多个过滤器

如果您想组合多个条件来优化搜索,可以使用 and 运算符以及其他过滤函数。以下是如何搜索名为“ChatOpenAI”且元数据中包含特定 conversation_id 的运行的示例
client.list_runs(
  project_name="default",
  filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

树形过滤器

列出所有名为“RetrieveDocs”的运行,其根运行的“user_score”反馈为 1,并且完整追踪中的任何运行名为“ExpandQuery”。 如果您想根据追踪中达到的各种状态或步骤来提取特定运行,这种类型的查询非常有用。
client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "RetrieveDocs")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
    tree_filter='eq(name, "ExpandQuery")'
)

高级:导出包含子工具使用情况的平铺追踪视图

以下 Python 示例演示了如何导出追踪的平铺视图,包括代理在每个追踪中使用的工具(来自嵌套运行)的信息。这可用于分析代理在多个追踪中的行为。 此示例查询指定天数内所有工具运行,并按其父(根)运行 ID 分组。然后,它获取每个根运行的相关信息,例如运行名称、输入、输出,并将该信息与子运行信息结合起来。 为了优化查询,该示例:
  1. 查询工具运行时只选择必要的字段,以减少查询时间。
  2. 在并发处理工具运行时分批获取根运行。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# List all tool runs
tool_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="tool",
    # We don't need to fetch inputs, outputs, and other values that # may increase the query time
    select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
    # Group tool runs by parent run ID
    for run in tqdm(tool_runs):
        # Collect all tools invoked within a given trace
        tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
        # maybe send a batch of parent run IDs to the server
        # this lets us query for the root runs in batches
        # while still processing the tool runs
        if len(tool_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(tool_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=["name", "inputs", "outputs", "run_type"],
                    )
                )
    if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = tool_runs_by_parent[root_run.id]
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                "run_type": root_run.run_type,
                "inputs": root_run.inputs,
                "outputs": root_run.outputs,
                "tools_involved": list(root_data["tools_involved"]),
            }
        )

# (Optional): Convert to a pandas DataFrame
import pandas as pd

df = pd.DataFrame(data)
df.head()

高级:导出带反馈的追踪的检索器 IO

此查询有助于微调嵌入或根据检索器行为诊断端到端系统性能问题。以下 Python 示例演示了如何导出具有特定反馈分数的追踪中的检索器输入和输出。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# List all tool runs
retriever_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="retriever",
    # This time we do want to fetch the inputs and outputs, since they
    # may be adjusted by query expansion steps.
    select=["trace_id", "name", "run_type", "inputs", "outputs"],
    trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
    # Group retriever runs by parent run ID
    for run in tqdm(retriever_runs):
        # Collect all retriever calls invoked within a given trace
        for k, v in run.inputs.items():
            retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
        for k, v in (run.outputs or {}).items():
            # Extend the docs
            retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
        # maybe send a batch of parent run IDs to the server
        # this lets us query for the root runs in batches
        # while still processing the retriever runs
        if len(retriever_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(retriever_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=[
                            "name",
                            "inputs",
                            "outputs",
                            "run_type",
                            "feedback_stats",
                        ],
                    )
                )
    if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = retriever_runs_by_parent[root_run.id]
        feedback = {
            f"feedback.{k}": v.get("avg")
            for k, v in (root_run.feedback_stats or {}).items()
        }
        inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
        outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                **inputs,
                **outputs,
                **feedback,
                **root_data,
            }
        )

# (Optional): Convert to a pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

以编程方式连接这些文档到 Claude、VSCode 等,通过 MCP 获取实时答案。
© . This site is unofficial and not affiliated with LangChain, Inc.