FastAPI权限验证依赖项究竟藏着什么秘密?
title: FastAPI权限验证依赖项究竟藏着什么秘密?


from fastapi import Depends
async def common_parameters(q: str = None, skip: int = 0, limit: int = 100):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: str | None = None
disabled: bool | None = None
class TokenData(BaseModel):
username: str | None = None
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str):
return pwd_context.hash(password)
from fastapi import HTTPException, status
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭证",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
class RoleChecker:
def __init__(self, allowed_roles: list):
self.allowed_roles = allowed_roles
def __call__(self, user: User = Depends(get_current_user)):
if user.role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return user
admin_permission = RoleChecker(["admin"])
editor_permission = RoleChecker(["editor", "admin"])
@app.get("/admin/dashboard", dependencies=[Depends(admin_permission)])
async def admin_dashboard():
return {"message": "欢迎来到管理控制台"}
@app.post("/articles/")
async def create_article(
user: User = Depends(editor_permission),
article: ArticleCreate
):
return {
"message": "文章创建成功",
"author": user.username,
"content": article.content
}
{
"detail": [
{
"loc": [
"header",
"authorization"
],
"msg": "field required",
"type": "value_error.missing"
}
]
}
fastapi==0.95.2
pydantic==1.10.7
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
uvicorn==0.22.0
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]
评论
发表评论