FastAPI安全机制:从OAuth2到JWT的魔法通关秘籍
title: FastAPI安全机制:从OAuth2到JWT的魔法通关秘籍


# 安装依赖:pip install fastapi==0.78.0 uvicorn==0.18.3 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# 安全配置参数
SECRET_KEY = "your-secret-key-here" # 生产环境应从环境变量获取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 模拟数据库用户模型
class User(BaseModel):
username: str
hashed_password: str
disabled: Optional[bool] = None
class UserInDB(User):
password: str
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 方案配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# 模拟数据库查询
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 密码验证函数
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user or not pwd_context.verify(password, user.hashed_password):
return False
return user
# 创建访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# 令牌验证依赖项
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# 此处应查询真实数据库
user = get_user(fake_db, username=username)
if user is None:
raise credentials_exception
return user
# 登录端点
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 受保护端点
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
客户端 → 请求头携带Bearer Token → FastAPI路由 → 依赖注入系统 → JWT解码 → 用户验证 → 业务逻辑处理
查看答案 正确答案:B 解析:401状态码表示身份验证失败。令牌过期会导致JWT验证失败,而格式错误通常会返回400 Bad Request。答案D属于服务器内部错误(5xx),答案C通常返回415 Unsupported Media Type。
查看答案 正确答案:D 完整的防护需要多层面措施:HTTPS保证传输安全,强签名算法防止伪造,定期更换密钥降低泄露风险。
{
"detail": [
{
"loc": [
"body",
"username"
],
"msg": "field required",
"type": "value_error.missing"
}
]
}
{
"detail": "Could not validate credentials"
}
评论
发表评论