FastAPI依赖注入实践:工厂模式与实例复用的优化策略
title: FastAPI依赖注入实践:工厂模式与实例复用的优化策略


@app.get("/items/")
def read_items(service: ItemService = Depends()):
return service.get_items()
class DatabaseConfig:
def __init__(self, url: str = "sqlite:///test.db"):
self.url = url
class DatabaseService:
def __init__(self, config: DatabaseConfig):
self.connection = self.create_connection(config.url)
def create_connection(self, url):
# 模拟数据库连接
print(f"Creating new connection to {url}")
return f"Connection_{id(self)}"
def get_db_service(config: DatabaseConfig = Depends()) -> DatabaseService:
return DatabaseService(config)
@app.get("/users/")
def get_users(service: DatabaseService = Depends(get_db_service)):
return {"connection": service.connection}
from fastapi import Depends
from functools import lru_cache
class AnalysisService:
def __init__(self, config: dict):
self.model = self.load_ai_model(config["model_path"])
def load_ai_model(self, path):
print(f"Loading AI model from {path}")
return f"Model_{id(self)}"
@lru_cache(maxsize=1)
def get_analysis_service(config: dict = {"model_path": "models/v1"}) -> AnalysisService:
return AnalysisService(config)
@app.get("/predict")
def make_prediction(service: AnalysisService = Depends(get_analysis_service)):
return {"model": service.model}
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class DatabaseSingleton:
_instance = None
def __new__(cls, dsn: str):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.engine = create_engine(dsn)
cls._instance.Session = sessionmaker(bind=cls._instance.engine)
return cls._instance
@contextmanager
def get_db_session(dsn: str = "sqlite:///test.db"):
db = DatabaseSingleton(dsn)
session = db.Session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
@app.get("/transactions")
def get_transactions(session=Depends(get_db_session)):
return {"status": "success"}
from fastapi import Request
class RequestTracker:
def __init__(self, request: Request):
self.request = request
self.start_time = time.time()
@property
def duration(self):
return time.time() - self.start_time
def get_tracker(request: Request) -> RequestTracker:
if not hasattr(request.state, "tracker"):
request.state.tracker = RequestTracker(request)
return request.state.tracker
@app.get("/status")
def get_status(tracker: RequestTracker = Depends(get_tracker)):
return {"duration": tracker.duration}
from pydantic import BaseSettings
class AppSettings(BaseSettings):
env: str = "dev"
api_version: str = "v1"
class Config:
env_file = ".env"
def config_factory() -> AppSettings:
return AppSettings()
def get_http_client(settings: AppSettings = Depends(config_factory)):
timeout = 30 if settings.env == "prod" else 100
return httpx.Client(timeout=timeout)
class TenantContext:
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.config = self.load_tenant_config()
def load_tenant_config(self):
# 模拟从数据库加载配置
return {
"db_url": f"sqlite:///tenant_{self.tenant_id}.db",
"theme": "dark" if self.tenant_id == "acme" else "light"
}
def tenant_factory(tenant_id: str = Header(...)) -> TenantContext:
return TenantContext(tenant_id)
@app.get("/dashboard")
def get_dashboard(ctx: TenantContext = Depends(tenant_factory)):
return {"theme": ctx.config["theme"]}
{
"detail": [
{
"loc": [
"header",
"x-tenant-id"
],
"msg": "field required",
"type": "value_error.missing"
}
]
}
RuntimeError: Unable to initialize service - missing config
评论
发表评论