from enum import Enum
from pydantic import BaseModel
classUserRole(str, Enum):
GUEST ="guest"
USER ="user"
ADMIN ="admin"
SUPER_ADMIN ="super_admin"classUser(BaseModel):
username:str
role: UserRole
permissions:list[str]=[]
权限验证流程
1.3 创建权限依赖项
基础用户获取依赖
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")asyncdefget_current_user(token:str= Depends(oauth2_scheme)):# 模拟数据库查询
fake_users_db ={"user1": User(username="user1", role=UserRole.USER),"admin1": User(username="admin1", role=UserRole.ADMIN)}
user = fake_users_db.get(token)ifnot user:raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证信息")return user
权限检查依赖
from typing import List
defrequire_role(required_role: UserRole):asyncdefrole_checker(user: User = Depends(get_current_user)):if user.role notin[required_role, UserRole.SUPER_ADMIN]:raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足")return user
return Depends(role_checker)defrequire_permissions(required_perms: List[str]):asyncdefperm_checker(user: User = Depends(get_current_user)):
missing =[perm for perm in required_perms
if perm notin user.permissions]if missing and user.role != UserRole.SUPER_ADMIN:raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {', '.join(missing)}")return user
return Depends(perm_checker)
1.4 路由集成示例
from fastapi import APIRouter
router = APIRouter()@router.get("/user-data",
dependencies=[Depends(require_role(UserRole.USER))])asyncdefget_user_data():return{"data":"普通用户数据"}@router.get("/admin-report",
dependencies=[Depends(require_role(UserRole.ADMIN)),
Depends(require_permissions(["report_view"]))])asyncdefget_admin_report():return{"report":"管理员专属报表"}
1.5 高级配置技巧
动态权限加载
from functools import lru_cache
@lru_cache()asyncdefload_permissions(user: User):# 模拟数据库查询
perm_map ={
UserRole.USER:["data_view"],
UserRole.ADMIN:["data_view","report_view"]}
user.permissions = perm_map.get(user.role,[])return user
defdynamic_permission(perm_name:str):asyncdefchecker(user: User = Depends(get_current_user)):await load_permissions(user)if perm_name notin user.permissions:raise HTTPException(status_code=403,
detail="动态权限不足")return user
return Depends(checker)
评论
发表评论