from langchain_auth import Clientfrom langchain.tools import tool, ToolRuntimeauth_client = Client()# Inside your agent's tool:@toolasync def github_action(runtime: ToolRuntime): """Perform an action on behalf of the user via GitHub.""" auth_result = await auth_client.authenticate( provider="github", scopes=["repo", "read:org"], user_id=runtime.server_info.user.identity, ) # Use auth_result.token for GitHub API calls on the user's behalf
沙箱凭据注入。 如果您的智能体在调用外部 API 的 沙箱 内运行代码,沙箱身份验证代理 可以自动将凭据注入到传出请求中,因此沙箱代码永远不会收到原始 API 密钥。有关设置详情,请参阅 管理密钥。工作空间机密 (Workspace secrets)。 对于所有用户共享的 API 密钥(例如您组织的 LLM 提供商密钥、搜索 API 密钥),请将它们作为 LangSmith 中的 工作空间机密 存储。有关详情,请参阅 管理密钥。
from deepagents import create_deep_agentfrom deepagents.backends import CompositeBackend, StateBackend, StoreBackendagent = create_deep_agent( model="google_genai:gemini-3.1-pro-preview", backend=CompositeBackend( default=StateBackend(), routes={ "/memories/": StoreBackend( namespace=lambda rt: ( rt.server_info.assistant_id, rt.server_info.user.identity, ), ), }, ), system_prompt="""You have persistent memory at /memories/. Read /memories/instructions.txt at the start of each conversation for accumulated knowledge and preferences. When you learn something that should persist, update that file.""",)
from langgraph_sdk import get_clientclient = get_client(url="<DEPLOYMENT_URL>", api_key="<LANGSMITH_API_KEY>")# Conversation 1: install pandas and analyze datathread_1 = await client.threads.create()async for chunk in client.runs.stream( thread_1["thread_id"], "agent", input={"messages": [{"role": "human", "content": "Install pandas and analyze sales_data.csv"}]}, stream_mode="updates",): print(chunk.data)# Follow-up in the same conversation — pandas is still installedasync for chunk in client.runs.stream( thread_1["thread_id"], "agent", input={"messages": [{"role": "human", "content": "Now plot the results"}]}, stream_mode="updates",): print(chunk.data)# Conversation 2: fresh sandbox — pandas is NOT installed, no files from conversation 1thread_2 = await client.threads.create()async for chunk in client.runs.stream( thread_2["thread_id"], "agent", input={"messages": [{"role": "human", "content": "What packages are installed?"}]}, stream_mode="updates",): print(chunk.data)
from langgraph_sdk import get_clientclient = get_client(url="<DEPLOYMENT_URL>", api_key="<LANGSMITH_API_KEY>")# Conversation 1: clone and set up the projectthread_1 = await client.threads.create()async for chunk in client.runs.stream( thread_1["thread_id"], "agent", input={"messages": [{"role": "human", "content": "Clone https://github.com/org/repo and install dependencies"}]}, stream_mode="updates",): print(chunk.data)# Conversation 2: repo and dependencies are still therethread_2 = await client.threads.create()async for chunk in client.runs.stream( thread_2["thread_id"], "agent", input={"messages": [{"role": "human", "content": "Run the test suite and fix any failures"}]}, stream_mode="updates",): print(chunk.data)
from deepagents import create_deep_agentfrom langchain.agents.middleware import AgentMiddleware, AgentStatefrom langgraph.runtime import Runtimedef _safe_filename(key: str) -> str: """Reject keys that contain path traversal or glob characters.""" name = key.split("/")[-1] if ".." in name or any(c in name for c in ("*", "?")): raise ValueError(f"Invalid key: {key}") return nameclass SandboxSyncMiddleware(AgentMiddleware): """Sync skills and memories between the store and the sandbox.""" def __init__(self, backend: CompositeBackend): super().__init__() self.backend = backend async def abefore_agent(self, state: AgentState, runtime: Runtime) -> None: """Upload skill scripts and memories into the sandbox.""" user_id = runtime.server_info.user.identity store = runtime.store files = [] for item in await store.asearch(("skills", user_id)): name = _safe_filename(item.key) files.append((f"/skills/{name}", item.value["content"].encode())) for item in await store.asearch(("memories", user_id)): name = _safe_filename(item.key) files.append((f"/memories/{name}", item.value["content"].encode())) if files: await self.backend.upload_files(files) async def aafter_agent(self, state: AgentState, runtime: Runtime) -> None: """Sync updated memories back to the store.""" user_id = runtime.server_info.user.identity store = runtime.store items = await store.asearch(("memories", user_id)) results = await self.backend.download_files( [f"/memories/{item.key}" for item in items] ) for result in results: if result.content is not None: await store.aput( ("memories", user_id), result.path.split("/")[-1], {"content": result.content.decode()}, )backend = CompositeBackend( default=DaytonaSandbox(sandbox=sandbox), routes={ "/skills/": StoreBackend( rt, namespace=lambda rt: ("skills", rt.server_info.user.identity), ), "/memories/": StoreBackend( rt, namespace=lambda rt: ("memories", rt.server_info.user.identity), ), },)agent = create_deep_agent( model="google_genai:gemini-3.1-pro-preview", backend=backend, middleware=[SandboxSyncMiddleware(backend)],)
沙箱是隔离的容器,因此宿主机的环境变量在沙箱内不可用。有两种方法可以向沙箱代码提供 API 密钥和其他机密信息:身份验证代理 (推荐)。沙箱身份验证代理 会拦截来自沙箱的传出请求并自动注入身份验证标头。沙箱代码正常调用外部 API,代理根据目标主机添加正确的凭据。这意味着 API 密钥永远不会出现在沙箱代码、环境变量或日志中。
from deepagents import create_deep_agentfrom langchain.agents.middleware import ( ModelFallbackMiddleware, ModelRetryMiddleware, ToolRetryMiddleware,)agent = create_deep_agent( model="google_genai:gemini-3.1-pro-preview", middleware=[ # Retry model calls on rate limits, timeouts, and 5xx errors ModelRetryMiddleware(max_retries=3, backoff_factor=2.0, initial_delay=1.0), # If the primary model is fully down, fall back to an alternative ModelFallbackMiddleware("gpt-5.4"), # Retry specific tools that hit external APIs (not all tools) ToolRetryMiddleware( max_retries=2, tools=["search", "fetch_url"], retry_on=(TimeoutError, ConnectionError), ), ],)