异步之舞:FastAPI与MongoDB的极致性能优化之旅
title: 异步之舞:FastAPI与MongoDB的极致性能优化之旅


# 安装依赖
# pip install motor==3.1.1 fastapi==0.103.2 pydantic==2.5.3
from motor.motor_asyncio import AsyncIOMotorClient
from fastapi import Depends
async def get_db():
client = AsyncIOMotorClient("mongodb://localhost:27017", maxPoolSize=100)
return client.blog_db
# 依赖注入使用示例
@app.post("/comments")
async def create_comment(
comment: CommentModel,
db: AsyncIOMotorDatabase = Depends(get_db)
):
result = await db.comments.insert_one(comment.dict())
return {"inserted_id": str(result.inserted_id)}
from pydantic import BaseModel
from typing import List
class UserAction(BaseModel):
user_id: str
action_type: str
timestamp: datetime = Field(default_factory=datetime.now)
@app.post("/user_actions/bulk")
async def bulk_insert_actions(
actions: List[UserAction],
db: AsyncIOMotorDatabase = Depends(get_db)
):
operations = [InsertOne(action.dict()) for action in actions]
result = await db.user_actions.bulk_write(operations)
return {
"inserted_count": result.inserted_count,
"batch_size": len(actions)
}
@app.get("/activity/hourly")
async def get_hourly_activity(db: AsyncIOMotorDatabase = Depends(get_db)):
pipeline = [
{"$project": {
"hour": {"$hour": "$timestamp"},
"action_type": 1
}},
{"$group": {
"_id": "$hour",
"total_actions": {"$sum": 1},
"unique_actions": {"$addToSet": "$action_type"}
}},
{"$sort": {"_id": 1}}
]
results = await db.user_actions.aggregate(pipeline).to_list(1000)
return {"hourly_data": results}
# 后台创建复合索引(不影响服务可用性)
await db.user_actions.create_index(
[("user_id", 1), ("timestamp", -1)],
background=True,
name="user_activity_idx"
)
class LogEntry(BaseModel):
level: str
message: str
service: str
context: dict = {}
created_at: datetime = Field(default_factory=datetime.now)
@app.post("/logs/batch")
async def batch_logs(
logs: List[LogEntry],
db: AsyncIOMotorDatabase = Depends(get_db)
):
# 批量插入优化
batch_size = 500
inserted_count = 0
for i in range(0, len(logs), batch_size):
batch = logs[i:i + batch_size]
result = await db.logs.insert_many(
[log.dict() for log in batch],
ordered=False # 忽略个别错误继续插入
)
inserted_count += len(result.inserted_ids)
return {"accepted": inserted_count}
ValidationError: 1 validation error for CommentModel
content
field required (type=value_error.missing)
from fastapi import HTTPException
@app.post("/comments")
async def create_comment(data: dict):
try:
validated = CommentModel(**data)
except ValidationError as e:
raise HTTPException(400, detail=str(e))
# 处理验证后的数据...
app.add_middleware(
ValidationErrorMiddleware,
handlers=[http_error_handler]
)
评论
发表评论