异步之舞:Motor驱动与MongoDB的CRUD交响曲
title: 异步之舞:Motor驱动与MongoDB的CRUD交响曲


# 安装依赖
# pip install fastapi==0.78.0 motor==2.5.0 pydantic==1.10.7
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
app = FastAPI()
# MongoDB 连接配置
DATABASE_URL = "mongodb://localhost:27017"
client = AsyncIOMotorClient(DATABASE_URL)
db = client["mydatabase"]
users_collection = db["users"]
class UserCreate(BaseModel):
name: str
age: int
email: str
class UserResponse(UserCreate):
id: str
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate):
user_dict = user.dict()
result = await users_collection.insert_one(user_dict)
created_user = await users_collection.find_one({"_id": result.inserted_id})
return {**created_user, "id": str(created_user["_id"])}
@app.get("/users", response_model=list[UserResponse])
async def get_users(skip: int = 0, limit: int = 10):
users = []
query = {"age": {"$gte": 18}} # 查询18岁以上用户
projection = {"_id": 0, "id": {"$toString": "$_id"}, "name": 1, "age": 1} # 字段投影
async for user in users_collection.find(query).skip(skip).limit(limit).project(projection):
users.append(user)
return users
@app.put("/users/{user_id}")
async def update_user(user_id: str, user_update: UserCreate):
update_result = await users_collection.update_one(
{"_id": user_id},
{"$set": user_update.dict(exclude_unset=True)}
)
return {"modified_count": update_result.modified_count}
@app.delete("/users/{user_id}")
async def delete_user(user_id: str):
result = await users_collection.update_one(
{"_id": user_id},
{"$set": {"is_deleted": True}}
)
return {"modified_count": result.modified_count}
@app.get("/users/age-stats")
async def get_age_stats():
pipeline = [
{"$match": {"is_deleted": {"$ne": True}}},
{"$group": {
"_id": None,
"averageAge": {"$avg": "$age"},
"minAge": {"$min": "$age"},
"maxAge": {"$max": "$age"}
}}
]
result = await users_collection.aggregate(pipeline).to_list(1)
return result[0] if result else {}
# 创建索引
async def create_indexes():
await users_collection.create_index("email", unique=True)
await users_collection.create_index([("name", "text")])
# 针对常用查询字段创建复合索引
await users_collection.create_index([("age", 1), ("is_deleted", 1)])
答案 B) 正确。异步驱动通过非阻塞 I/O 允许事件循环在处理数据库操作等待期间继续处理其他请求,提升并发处理能力。
答案 D) 正确。最佳实践是同时使用数据库唯一索引(A)和业务逻辑校验(B),在并发场景下可配合事务(C)保证数据一致性。
motor.motor_asyncio.ServerSelectionTimeoutError: ...
{
"detail": [
{
"loc": [
"body",
"age"
],
"msg": "field required",
"type": "value_error.missing"
}
]
}
pymongo.errors.DuplicateKeyError: E11000 duplicate key error...
评论
发表评论