FastAPI安全认证:从密码到令牌的魔法之旅
title: FastAPI安全认证:从密码到令牌的魔法之旅


from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
router = APIRouter(tags=["Authentication"])
# 密码哈希配置(使用bcrypt算法)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT配置(实际项目应从环境变量读取)
SECRET_KEY = "your-secret-key-keep-it-secret!"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 用户模型
class UserCreate(BaseModel):
username: str
password: str
# 令牌响应模型
class Token(BaseModel):
access_token: str
token_type: str
@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: UserCreate):
# 用户验证(示例用静态数据,实际应查数据库)
if form_data.username != "admin" or not pwd_context.verify(
"secret", # 数据库中存储的哈希密码
form_data.password
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
# 生成JWT令牌
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 安装依赖库(版本需严格对应)
fastapi==0.68.1
uvicorn==0.15.0
python-jose[cryptography]==3.3.0
passlib==1.7.4
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭证",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# 此处应查询数据库验证用户存在性
if username != "admin":
raise credentials_exception
return username
# 安全路由示例
@router.get("/protected")
async def protected_route(current_user: str = Depends(get_current_user)):
return {"message": f"欢迎您, {current_user}"}
评论
发表评论