如何在FastAPI中轻松实现OAuth2认证并保护你的API?
title: 如何在FastAPI中轻松实现OAuth2认证并保护你的API?


pip install fastapi==0.103.1
pip install python-jose[cryptography]==3.3.0
pip install passlib[bcrypt]==1.7.4
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
# 安全配置参数
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = 30 # 分钟
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str):
"""验证密码与哈希值是否匹配"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str):
"""生成密码哈希值"""
return pwd_context.hash(password)
def create_access_token(data: dict):
"""生成JWT访问令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
router = APIRouter()
# 模拟数据库中的用户数据
fake_users_db = {
"johndoe": {
"username": "johndoe",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga7lCy", # secret
"disabled": False,
}
}
class User(BaseModel):
username: str
disabled: bool = None
class UserInDB(User):
hashed_password: str
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""解析并验证JWT令牌"""
credentials_exception = HTTPException(
status_code=401,
detail="无效的身份凭证",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = fake_users_db.get(username)
if user is None:
raise credentials_exception
return UserInDB(**user)
@router.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""用户登录接口"""
user = fake_users_db.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(status_code=400, detail="用户名或密码错误")
access_token = create_access_token(data={"sub": user["username"]})
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
"""获取当前用户信息的受保护路由"""
return current_user
from fastapi import Depends
@app.get("/protected-route")
async def protected_route(current_user: User = Depends(get_current_user)):
"""需要认证的受保护路由示例"""
return {
"message": "您已成功访问受保护资源",
"current_user": current_user.username
}
app = FastAPI(debug=True)
# 在令牌中加入角色声明
token_data = {"sub": username, "role": "admin"}
# 验证角色中间件
def require_admin(user: User = Depends(get_current_user)):
if user.role != "admin":
raise HTTPException(403, "需要管理员权限")
from fastapi.middleware import Middleware
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app = FastAPI(middleware=[Middleware(limiter)])
评论
发表评论