如何在FastAPI中实现权限隔离并让用户乖乖听话?
title: 如何在FastAPI中实现权限隔离并让用户乖乖听话?


客户端请求 -> [认证中间件] -> [角色依赖注入] -> [路由处理器]
# 安装依赖
# pip install fastapi==0.68.0 uvicorn==0.15.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
# 配置参数
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class User(BaseModel):
username: str
role: str # 新增角色字段
class UserInDB(User):
hashed_password: str
# 模拟数据库
fake_users_db = {
"admin": {
"username": "admin",
"hashed_password": CryptContext(schemes=["bcrypt"]).hash("secret"),
"role": "admin"
},
"user1": {
"username": "user1",
"hashed_password": CryptContext(schemes=["bcrypt"]).hash("password"),
"role": "user"
}
}
# 创建JWT令牌
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# 用户登录接口
@app.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict or not pwd_context.verify(form_data.password, user_dict["hashed_password"]):
raise HTTPException(status_code=400, detail="用户名或密码错误")
access_token = create_access_token(
data={"sub": user_dict["username"], "role": user_dict["role"]}
)
return {"access_token": access_token, "token_type": "bearer"}
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
role: str = payload.get("role")
if username is None or role is None:
raise credentials_exception
except JWTError:
raise credentials_exception
return {"username": username, "role": role}
# 角色权限校验依赖项
def require_role(required_role: str):
def role_checker(current_user: dict = Depends(get_current_user)):
if current_user["role"] not in required_role.split(','):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return current_user
return role_checker
# 管理员专属接口
@app.get("/admin/dashboard", dependencies=[Depends(require_role("admin"))])
async def admin_dashboard():
return {"message": "欢迎进入管理面板"}
# 用户通用接口
@app.get("/user/profile")
async def user_profile(current_user: dict = Depends(require_role("user,admin"))):
return {"username": current_user["username"]}
def data_permission_check(resource_owner: str):
def checker(current_user: dict = Depends(get_current_user)):
if current_user["role"] != "admin" and current_user["username"] != resource_owner:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权访问该资源"
)
return current_user
return checker
@app.get("/orders/{user_id}")
async def get_orders(
user_id: str,
current_user: dict = Depends(data_permission_check(user_id))
):
# 获取订单数据的逻辑
return {"orders": [...]}
from fastapi.testclient import TestClient
client = TestClient(app)
def test_admin_access():
# 获取管理员token
token = client.post("/login", data={"username": "admin", "password": "secret"}).json()["access_token"]
response = client.get(
"/admin/dashboard",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
def test_user_access_admin_area():
token = client.post("/login", data={"username": "user1", "password": "password"}).json()["access_token"]
response = client.get(
"/admin/dashboard",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 403
@app.middleware("http")
async def validation_errors(request: Request, call_next):
try:
return await call_next(request)
except RequestValidationError as exc:
detail = {"errors": exc.errors()}
return JSONResponse(status_code=422, content=detail)
评论
发表评论