FastAPI安全异常处理:从401到422的奇妙冒险
title: FastAPI安全异常处理:从401到422的奇妙冒险


{
"detail": {
"code": "AUTH-001", # 自定义错误编码
"message": "Token expired", # 人类可读信息
"type": "token_expired" # 机器识别类型
}
}
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
"detail": {
"code": exc.headers.get("X-Error-Code", "UNKNOWN"),
"message": exc.detail,
"type": exc.headers.get("X-Error-Type", "unknown")
}
},
headers=exc.headers
)
异常类型 | 检测方法 | 推荐状态码 |
---|---|---|
签名篡改 | 签名验证失败 | 401 |
过期令牌 | 检查exp字段 | 401 |
格式错误 | Header/Payload格式解析失败 | 401 |
from jose import JWTError, jwt
from fastapi import Depends, HTTPException
from pydantic import BaseModel
class TokenData(BaseModel):
username: str | None = None
async def validate_token(token: str = Depends(oauth2_scheme)) -> TokenData:
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
headers={"X-Error-Code": "AUTH-003"}
)
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM]
)
if (exp := payload.get("exp")) is None or exp < datetime.utcnow().timestamp():
raise HTTPException(status_code=401, detail="Token expired")
return TokenData(**payload)
except JWTError as e:
raise credentials_exception from e
from datetime import datetime, timedelta
def create_tokens(username: str) -> dict:
access_expire = datetime.utcnow() + timedelta(minutes=15)
refresh_expire = datetime.utcnow() + timedelta(days=7)
access_payload = {"sub": username, "exp": access_expire, "type": "access"}
refresh_payload = {"sub": username, "exp": refresh_expire, "type": "refresh"}
return {
"access_token": jwt.encode(access_payload, SECRET_KEY, ALGORITHM),
"refresh_token": jwt.encode(refresh_payload, SECRET_KEY, ALGORITHM),
"expires_in": 900 # 秒数
}
# requirements.txt
fastapi == 0.68
.1
python - jose[cryptography] == 3.3
.0
passlib[bcrypt] == 1.7
.4
uvicorn == 0.15
.0
# main.py
from datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
# 配置参数
SECRET_KEY = "your-secret-key-here" # 生产环境应使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError as e:
error_type = "expired" if isinstance(e, jwt.ExpiredSignatureError) else "invalid"
raise HTTPException(
status_code=401,
detail=f"Token validation failed: {error_type}",
headers={"X-Error-Type": error_type}
) from e
return token_data
@app.post("/token")
async def login_for_access_token():
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": "fakeuser"}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/protected/")
async def read_protected_route(current_user: TokenData = Depends(get_current_user)):
return {"message": "Secure content accessed"}
评论
发表评论