如何在FastAPI中玩转跨服务权限校验的魔法?
title: 如何在FastAPI中玩转跨服务权限校验的魔法?


# 安装依赖
# fastapi==0.68.0
# python-jose[cryptography]==3.3.0
# httpx==0.23.0
from fastapi import Depends, HTTPException, status
from jose import JWTError, jwt
from pydantic import BaseModel
# 公共配置模型
class AuthConfig(BaseModel):
secret_key: str = "your-256bit-secret"
algorithm: str = "HS256"
issuer: str = "https://auth.service"
audience: str = ["order.service", "payment.service"]
def create_access_token(
subject: str,
service_scopes: list,
config: AuthConfig
):
payload = {
"iss": config.issuer,
"sub": subject,
"aud": config.audience,
"service_scopes": service_scopes
}
return jwt.encode(
payload,
config.secret_key,
algorithm=config.algorithm
)
async def validate_service_token(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="token")),
config: AuthConfig = Depends(get_auth_config)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token,
config.secret_key,
algorithms=[config.algorithm],
audience=config.audience,
issuer=config.issuer
)
if "service_scopes" not in payload:
raise credentials_exception
except JWTError:
raise credentials_exception
return payload["service_scopes"]
class ServiceClient:
def __init__(self, base_url: str, token: str):
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {token}"}
)
async def call_service(self, endpoint: str):
response = await self.client.get(endpoint)
response.raise_for_status()
return response.json()
# 在路由中使用
@app.post("/place-order")
async def place_order(
scopes: list = Depends(validate_service_token),
service_client: ServiceClient = Depends(get_service_client)
):
if "order.write" not in scopes:
raise HTTPException(status.HTTP_403_FORBIDDEN)
payment_result = await service_client.call_service("/payments")
return {"status": "order_created"}
# HMAC签名示例
def sign_request(data: bytes, key: str):
return hmac.new(
key.encode(),
data,
digestmod=hashlib.sha256
).hexdigest()
# 在客户端调用前生成签名
signature = sign_request(payload, "secret-sign-key")
headers["X-Signature"] = signature
评论
发表评论