FastAPI依赖注入与上下文管理
title: FastAPI依赖注入与上下文管理


from fastapi import Depends, FastAPI, Header
app = FastAPI()
async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401)
return authorization[7:]
app = FastAPI(dependencies=[Depends(verify_token)])
层级类型 | 作用范围 | 典型应用场景 |
---|---|---|
全局依赖 | 所有路由 | 身份认证、请求日志 |
路由组依赖 | 指定路由组 | API版本控制、权限分级 |
单路由依赖 | 单个路由 | 特殊参数校验、业务级权限 |
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时执行
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 应用关闭时执行
await engine.dispose()
async def get_db():
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
app = FastAPI(lifespan=lifespan, dependencies=[Depends(get_db)])
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化Redis连接池
app.state.redis = await create_redis_pool()
yield
# 关闭时释放资源
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# 记录到全局状态
request.app.state.request_count += 1
return response
from fastapi import APIRouter, Depends
from pydantic import BaseModel
class ProductCreate(BaseModel):
name: str
price: float
stock: int
router = APIRouter(prefix="/products")
@router.post("")
async def create_product(
product_data: ProductCreate,
db: AsyncSession = Depends(get_db),
redis=Depends(get_redis)
):
# 检查商品名称重复
existing = await db.execute(
select(Product).filter(Product.name == product_data.name)
)
if existing.scalar():
raise HTTPException(400, "Product name exists")
# 写入数据库
new_product = Product(**product_data.dict())
db.add(new_product)
await db.commit()
# 更新缓存
await redis.delete("product_list")
return {"id": new_product.id}
async def auth_dependency():
try:
# 验证逻辑
yield
except Exception as e:
return JSONResponse(status_code=401, content={"error": str(e)})
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)
curl -X POST http://localhost:8000/items \
-H "Content-Type: application/json" \
-d '{"name":"example", "price": 9.99}'
评论
发表评论