FastAPI路由专家课:微服务架构下的路由艺术与工程实践 🌐
title: FastAPI路由专家课:微服务架构下的路由艺术与工程实践 🌐
from fastapi import APIRouter
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")
@v1_router.get("/users")
async def legacy_api():
return {"format": "XML"}
@v2_router.get("/users")
async def new_api():
return {"format": "JSON"}
app.include_router(v1_router)
app.include_router(v2_router)
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
start_time = time.time()
# JWT令牌验证
if not verify_token(request.headers.get("Authorization")):
return JSONResponse({"error": "未授权"}, 401)
response = await call_next(request)
# 添加性能监控头
response.headers["X-Process-Time"] = str(time.time() - start_time)
return response
class GalaxyResponse(BaseModel):
code: int = 200
data: Any
pagination: Optional[dict] = None
@app.get("/planets")
async def list_planets() -> GalaxyResponse:
return GalaxyResponse(
data=db.query(Planet).all(),
pagination={"total": 100, "page": 1}
)
@app.exception_handler(AuthError)
async def custom_exception_handler(request, exc):
return JSONResponse(
status_code=401,
content={"code": 1001, "msg": "访问令牌已过期"}
)
from fastapi.testclient import TestClient
def test_user_flow():
with TestClient(app) as client:
# 创建测试用户
resp = client.post("/v2/users", json={"name": "测试员"})
assert resp.json()["code"] == 200
# 验证用户存在
user_id = resp.json()["data"]["id"]
resp = client.get(f(f"/v2/users/{user_id}")
assert resp.status_code == 200
# locustfile.py
from locust import HttpUser, task
class ApiUser(HttpUser):
@task
def access_data(self):
self.client.get("/products?category=electronics")
@app.on_event("startup")
async def register_service():
# 向Consul注册服务
consul_client.register(
name="user-service",
address=os.getenv("HOST"),
port=os.getenv("PORT")
)
# Kong网关配置示例
routes:
- name: user - service
paths: ["/api/v2/users*"]
service: user - service
plugins:
- name: rate - limiting
config:
minute = 10000
# 要求:
# 1. 根据Header中的实验分组返回不同内容
# 2. 实验组返回新版接口,对照组返回旧版
@app.get("/recommend")
async def ab_test(recommend_version: str = Header(None)):
# 你的代码
# 当订单服务失败率>50%时,自动切换备用方案
@app.get("/orders")
async def get_orders():
if circuit_breaker.state == "open":
return cached_orders()
else:
try:
return fetch_live_orders()
except Exception:
circuit_breaker.fail()
错误现象 | 原因 | 解决方案 |
---|---|---|
401 Unauthorized | 中间件未放行OPTIONS请求 | 添加CORS中间件到路由前 |
406 Not Acceptable | 响应格式不匹配 | 检查Accept头与produces声明 |
504 Gateway Timeout | 服务注册信息过期 | 增加Consul健康检查频率 |
评论
发表评论