FastAPI与MongoDB Change Stream的实时数据交响曲
title: FastAPI与MongoDB Change Stream的实时数据交响曲


# 创建虚拟环境
python -m venv env
source env/bin/activate # Linux/Mac
env\Scripts\activate # Windows
# 安装依赖
pip install fastapi==0.68.0 motor==3.3.2 pydantic==1.10.7 uvicorn==0.15.0 websockets==10.4
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
import asyncio
app = FastAPI()
# 配置MongoDB连接
@app.on_event("startup")
async def startup_db():
app.mongodb = AsyncIOMotorClient("mongodb://localhost:27017")
app.collection = app.mongodb.mydb.orders
# 启动后台监听任务
asyncio.create_task(watch_collection())
# 定义Pydantic数据模型
class OrderUpdate(BaseModel):
operation_type: str
document_key: dict
update_description: dict = None
# Change Stream监听核心逻辑
async def watch_collection():
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update"]}}}]
async with app.collection.watch(pipeline) as stream:
async for change in stream:
print(f"捕获到变更事件: {change}")
# 此处添加业务处理逻辑
# 例如调用消息队列或更新缓存
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
return await app.collection.find_one({"_id": order_id})
from fastapi import WebSocket
@app.websocket("/ws/order-updates")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
async with app.collection.watch() as stream:
async for change in stream:
validated = OrderUpdate(**change).dict()
await websocket.send_json(validated)
except Exception as e:
print(f"WebSocket错误: {e}")
finally:
await websocket.close()
# 创建组合索引加速变更查询
await app.collection.create_index([("_id", 1), ("clusterTime", -1)])
async with app.collection.watch(
max_await_time_ms=5000, # 每5秒批量获取一次
batch_size=100
) as stream:
# ...
# 限制Change Stream内存使用
client = AsyncIOMotorClient(max_pool_size=100, waitQueueTimeoutMS=30000)
pipeline = [
{"$match": {
"operationType": "update",
"fullDocument.user_id": "user123"
}}
]
async with collection.watch(resume_after=last_token) as stream:
@app.on_event("shutdown")
async def shutdown_event():
await app.mongodb.close()
class OrderUpdate(BaseModel):
class Config:
extra = "ignore" # 忽略额外字段
评论
发表评论