FastAPI与MongoDB分片集群:异步数据路由与聚合优化
title: FastAPI与MongoDB分片集群:异步数据路由与聚合优化


# requirements.txt
motor == 3.1
.1
fastapi == 0.95
.2
pydantic == 1.10
.7
# database.py
from motor.motor_asyncio import AsyncIOMotorClient
from contextlib import asynccontextmanager
class MongoDBShardClient:
def __init__(self, uri: str, max_pool_size: int = 100):
self.client = AsyncIOMotorClient(
uri,
maxPoolSize=max_pool_size,
connectTimeoutMS=3000,
socketTimeoutMS=5000
)
@asynccontextmanager
async def get_sharded_db(self, db_name: str):
try:
yield self.client[db_name]
finally:
# 连接自动归还连接池
pass
# 配置分片集群连接(包含3个mongos路由)
shard_client = MongoDBShardClient(
"mongodb://mongos1:27017,mongos2:27017,mongos3:27017/"
"?replicaSet=shardReplSet"
)
# models.py
from pydantic import BaseModel
from datetime import datetime
class OrderShardKey(BaseModel):
region: str # 地域前缀
order_id: str # 哈希分片依据
class OrderDocument(OrderShardKey):
user_id: int
total_amount: float
items: list[dict]
created_at: datetime = datetime.now()
# repository.py
class OrderShardRepository:
def __init__(self, db):
self.orders = db["orders"]
async def insert_order(self, order: OrderDocument):
# 自动路由到对应分片
return await self.orders.insert_one(order.dict())
sh.enableSharding("ecommerce")
sh.shardCollection("ecommerce.orders", {"region": 1, "order_id": "hashed"})
async def get_regional_sales(start_date: datetime):
pipeline = [
{"$match": {
"created_at": {"$gte": start_date},
"region": {"$exists": True}
}},
{"$group": {
"_id": "$region",
"total_sales": {"$sum": "$total_amount"},
"avg_order": {"$avg": "$total_amount"}
}},
{"$sort": {"total_sales": -1}},
{"$limit": 10}
]
async with shard_client.get_sharded_db("ecommerce") as db:
repo = OrderShardRepository(db)
return await repo.orders.aggregate(pipeline).to_list(1000)
# 创建复合索引
async def create_shard_indexes():
index_model = [
("region", 1),
("created_at", -1),
("user_id", 1)
]
async with shard_client.get_sharded_db("ecommerce") as db:
await db.orders.create_index(
index_model,
name="region_created_user",
background=True
)
motor.errors.ServerSelectionTimeoutError: No primary server available
Error 291: Cannot $sort with non-equality query on shard key
Error 50: Operation exceeded time limit
评论
发表评论