异步日志分析:MongoDB与FastAPI的高效存储揭秘
title: 异步日志分析:MongoDB与FastAPI的高效存储揭秘


pip install fastapi==0.103.2 motor==3.3.2 pydantic==2.5.3 python-dotenv==1.0.0
MONGODB_URL=mongodb://localhost:27017
DB_NAME=logs_db
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
class LogItem(BaseModel):
level: str
message: str
timestamp: str
source: str
@app.on_event("startup")
async def startup_db_client():
app.mongodb_client = AsyncIOMotorClient(os.getenv("MONGODB_URL"))
app.mongodb = app.mongodb_client[os.getenv("DB_NAME")]
@app.on_event("shutdown")
async def shutdown_db_client():
app.mongodb_client.close()
@app.post("/logs/")
async def create_log(log: LogItem):
log_dict = log.model_dump()
result = await app.mongodb.logs.insert_one(log_dict)
return {"id": str(result.inserted_id)}
@app.get("/logs/stats/level")
async def get_log_level_stats():
pipeline = [
{"$match": {"timestamp": {"$gte": "2024-01-01"}}},
{"$group": {
"_id": "$level",
"count": {"$sum": 1},
"last_occurrence": {"$last": "$timestamp"}
}},
{"$sort": {"count": -1}}
]
results = []
async for doc in app.mongodb.logs.aggregate(pipeline):
results.append({
"level": doc["_id"],
"count": doc["count"],
"last_occurred": doc["last_occurrence"]
})
return results
# 在启动时创建索引
@app.on_event("startup")
async def create_indexes():
await app.mongodb.logs.create_index([("timestamp", 1), ("level", 1)])
await app.mongodb.logs.create_index([("source", "text")])
# 分析查询性能
async def analyze_query():
explain_result = await app.mongodb.logs.find(
{"level": "ERROR"}
).explain()
print(explain_result["queryPlanner"]["winningPlan"])
class EnhancedLogItem(LogItem):
trace_id: str | None = None
user_id: str | None = None
@app.get("/logs/errors")
async def get_error_logs(limit: int = 100):
error_logs = []
async for doc in app.mongodb.logs.find(
{"level": "ERROR"},
{"_id": 0, "message": 1, "timestamp": 1, "source": 1}
).sort("timestamp", -1).limit(limit):
error_logs.append(doc)
return error_logs
@app.get("/logs/search")
async def search_logs(keyword: str):
results = []
async for doc in app.mongodb.logs.find(
{"$text": {"$search": keyword}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})]):
results.append({
"message": doc["message"],
"score": doc["score"]
})
return results
{
"detail": [
{
"type": "missing",
"loc": [
"body",
"level"
],
"msg": "Field required"
}
]
}
TimeoutError: Timed out connecting to localhost:27017
AsyncIOMotorClient(os.getenv("MONGODB_URL"), serverSelectionTimeoutMS=5000)
评论
发表评论