FastAPI权限迷宫:RBAC与多层级依赖的魔法通关秘籍
title: FastAPI权限迷宫:RBAC与多层级依赖的魔法通关秘籍


# 权限模型定义
from pydantic import BaseModel
from typing import List
class User(BaseModel):
username: str
roles: List[str] = []
class Permission(BaseModel):
name: str
description: str
class Role(BaseModel):
name: str
permissions: List[Permission] = []
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
# 模拟数据库存储
fake_users_db = {
"admin": {
"username": "admin",
"roles": ["admin"],
"permissions": ["create_post", "delete_user"]
},
"editor": {
"username": "editor",
"roles": ["editor"],
"permissions": ["edit_post"]
}
}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user_data = fake_users_db.get(token)
if not user_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return User(**user_data)
def has_permission(required_permission: str):
def permission_checker(user: User = Depends(get_current_user)):
if required_permission not in user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return user
return permission_checker
@app.get("/admin/dashboard", dependencies=[Depends(has_permission("delete_user"))])
async def admin_dashboard():
return {"message": "Welcome to admin dashboard"}
# 组合多个权限检查
from fastapi import Security
def require_roles(required_roles: List[str]):
def role_checker(user: User = Depends(get_current_user)):
if not any(role in required_roles for role in user.roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Required role missing"
)
return user
return role_checker
@app.get("/premium/content")
async def premium_content(
user: User = Security(has_permission("premium_access")),
_: User = Security(require_roles(["vip", "premium_user"]))
):
return {"content": "Premium content here"}
# 权限组合验证器
from functools import wraps
def combine_permissions(*dependencies):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for dep in dependencies:
await dep.dependency(*args, **kwargs)
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
admin_and_audit = combine_permissions(
Depends(has_permission("admin_access")),
Depends(require_roles(["auditor"]))
)
@app.get("/system/logs")
@admin_and_audit
async def system_logs():
return {"logs": [...]}
# 安装依赖
pip install fastapi==0.68.0 pydantic==1.10.7 uvicorn==0.15.0
# 运行服务
uvicorn main:app --reload
评论
发表评论