from langgraph_sdk import Authauth = Auth()@auth.authenticateasync def authenticate(headers: dict) -> Auth.types.MinimalUserDict: # Validate credentials (e.g., API key, JWT token) api_key = headers.get(b"x-api-key") if not api_key or not is_valid_key(api_key): raise Auth.exceptions.HTTPException( status_code=401, detail="Invalid API key" ) # Return user info - only identity and is_authenticated are required # Add any additional fields you need for authorization return { "identity": "user-123", # Required: unique user identifier "is_authenticated": True, # Optional: assumed True by default "permissions": ["read", "write"] # Optional: for permission-based auth # You can add more custom fields if you want to implement other auth patterns "role": "admin", "org_id": "org-456" }
@auth.onasync def add_owner( ctx: Auth.types.AuthContext, value: dict # The payload being sent to this access method) -> dict: # Returns a filter dict that restricts access to resources """Authorize all access to threads, runs, crons, and assistants. This handler does two things: - Adds a value to resource metadata (to persist with the resource so it can be filtered later) - Returns a filter (to restrict access to existing resources) Args: ctx: Authentication context containing user info, permissions, the path, and value: The request payload sent to the endpoint. For creation operations, this contains the resource parameters. For read operations, this contains the resource being accessed. Returns: A filter dictionary that LangGraph uses to restrict access to resources. See [Filter Operations](#filter-operations) for supported operators. """ # Create filter to restrict access to just this user's resources filters = {"owner": ctx.user.identity} # Get or create the metadata dictionary in the payload # This is where we store persistent info about the resource metadata = value.setdefault("metadata", {}) # Add owner to metadata - if this is a create or update operation, # this information will be saved with the resource # So we can filter by it later in read operations metadata.update(filters) # Return filters to restrict access # These filters are applied to ALL operations (create, read, update, search, etc.) # to ensure users can only access their own resources return filters
# Generic / global handler catches calls that aren't handled by more specific handlers@auth.onasync def reject_unhandled_requests(ctx: Auth.types.AuthContext, value: Any) -> False: print(f"Request to {ctx.path} by {ctx.user.identity}") raise Auth.exceptions.HTTPException( status_code=403, detail="Forbidden" )# Matches the "thread" resource and all actions - create, read, update, delete, search# Since this is **more specific** than the generic @auth.on handler, it will take precedence# over the generic handler for all actions on the "threads" resource@auth.on.threadsasync def on_thread( ctx: Auth.types.AuthContext, value: Auth.types.threads.create.value): # Setting metadata on the thread being created # will ensure that the resource contains an "owner" field # Then any time a user tries to access this thread or runs within the thread, # we can filter by owner metadata = value.setdefault("metadata", {}) metadata["owner"] = ctx.user.identity return {"owner": ctx.user.identity}# Thread creation. This will match only on thread create actions# Since this is **more specific** than both the generic @auth.on handler and the @auth.on.threads handler,# it will take precedence for any "create" actions on the "threads" resources@auth.on.threads.createasync def on_thread_create( ctx: Auth.types.AuthContext, value: Auth.types.threads.create.value): # Reject if the user does not have write access if "write" not in ctx.permissions: raise Auth.exceptions.HTTPException( status_code=403, detail="User lacks the required permissions." ) # Setting metadata on the thread being created # will ensure that the resource contains an "owner" field # Then any time a user tries to access this thread or runs within the thread, # we can filter by owner metadata = value.setdefault("metadata", {}) metadata["owner"] = ctx.user.identity return {"owner": ctx.user.identity}# Reading a thread. Since this is also more specific than the generic @auth.on handler, and the @auth.on.threads handler,# it will take precedence for any "read" actions on the "threads" resource@auth.on.threads.readasync def on_thread_read( ctx: Auth.types.AuthContext, value: Auth.types.threads.read.value): # Since we are reading (and not creating) a thread, # we don't need to set metadata. We just need to # return a filter to ensure users can only see their own threads return {"owner": ctx.user.identity}# Run creation, streaming, updates, etc.# This takes precedenceover the generic @auth.on handler and the @auth.on.threads handler@auth.on.threads.create_runasync def on_run_create( ctx: Auth.types.AuthContext, value: Auth.types.threads.create_run.value): metadata = value.setdefault("metadata", {}) metadata["owner"] = ctx.user.identity # Inherit thread's access control return {"owner": ctx.user.identity}# Assistant creation@auth.on.assistants.createasync def on_assistant_create( ctx: Auth.types.AuthContext, value: Auth.types.assistants.create.value): if "assistants:create" not in ctx.permissions: raise Auth.exceptions.HTTPException( status_code=403, detail="User lacks the required permissions." )
“类型安全”每个处理程序都有其 value 参数的类型提示,可在 Auth.types.on.<resource>.<action>.value 处获取。例如
复制
向 AI 提问
@auth.on.threads.createasync def on_thread_create(ctx: Auth.types.AuthContext,value: Auth.types.on.threads.create.value # Specific type for thread creation):...@auth.on.threadsasync def on_threads(ctx: Auth.types.AuthContext,value: Auth.types.on.threads.value # Union type of all thread actions):...@auth.onasync def on_all(ctx: Auth.types.AuthContext,value: dict # Union type of all possible actions):...