FastAPI数据库连接池配置与监控


# 配置Tortoise-ORM连接池示例
TORTOISE_ORM = {
"connections": {
"default": {
"engine": "tortoise.backends.asyncpg",
"credentials": {
"host": "localhost",
"port": "5432",
"user": "postgres",
"password": "secret",
"database": "mydb",
"minsize": 3, # 最小保持连接数
"maxsize": 20, # 最大连接数
"max_inactive_connection_lifetime": 300 # 空闲连接存活时间(秒)
}
}
},
"apps": {
"models": {
"models": ["models"],
"default_connection": "default"
}
}
}
from tortoise import Tortoise
@app.get("/pool-status")
async def get_pool_status():
pool = Tortoise.get_connection("default")._pool
return {
"current_size": pool._size,
"idle": len(pool._holders),
"in_use": pool._size - len(pool._holders)
}
pip install prometheus-client prometheus-fastapi-instrumentator
from prometheus_client import make_asgi_app
from prometheus_fastapi_instrumentator import Instrumentator
# 添加Prometheus中间件
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# 自定义连接池指标
class DatabaseMetrics:
def __init__(self):
self.connections_in_use = Gauge(
'db_connections_in_use',
'Current active connections'
)
async def update_metrics(self):
pool = Tortoise.get_connection("default")._pool
self.connections_in_use.set(pool._size - len(pool._holders))
# 启动定时任务
@app.on_event("startup")
async def start_metrics_task():
metrics = DatabaseMetrics()
async def _task():
while True:
await metrics.update_metrics()
await asyncio.sleep(5)
asyncio.create_task(_task())
from fastapi import APIRouter
from models import User_Pydantic, UserIn_Pydantic, Users
router = APIRouter()
@router.post("/users", response_model=User_Pydantic)
async def create_user(user: UserIn_Pydantic):
try:
# 自动获取连接执行操作
user_obj = await Users.create(**user.dict())
return await User_Pydantic.from_tortoise_orm(user_obj)
except Exception as e:
# 记录异常但不干扰连接池
logger.error(f"Create user failed: {str(e)}")
raise HTTPException(status_code=400, detail="User creation failed")
# 临时获取连接池状态
from tortoise import Tortoise
async def check_pool():
conn = Tortoise.get_connection("default")
print(f"Max size: {conn._pool._maxsize}")
print(f"Current size: {conn._pool._size}")
print(f"Available: {len(conn._pool._holders)}")
credentials = {
...
"timeout": 30 # 等待连接超时时间(秒)
}
async with in transaction():
# 数据库操作
await User.create(...)
评论
发表评论