数据库事务隔离与Alembic数据恢复的实战艺术


# 在FastAPI中设置隔离级别示例
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql://user:password@localhost/dbname"
# 设置隔离级别为REPEATABLE READ
engine = create_engine(
DATABASE_URL,
isolation_level="REPEATABLE READ"
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建新分支
alembic branch -- head -> new_feature
# 合并分支
alembic merge --branch new_feature
# 查看历史版本
alembic history --verbose
# 回滚到指定版本
alembic downgrade ae1027a6acf
# 强制回滚(当遇到冲突时)
alembic downgrade --sql ae1027a6acf > rollback.sql
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from pydantic import BaseModel
class OrderRollbackRequest(BaseModel):
target_version: str
verification_code: str
router = APIRouter()
@router.post("/orders/rollback")
async def rollback_orders(
request: OrderRollbackRequest,
db: Session = Depends(get_db)
):
try:
# 开启事务
db.execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE")
# 验证回滚权限
validate_rollback_permission(request.verification_code)
# 执行Alembic回滚
subprocess.run(f"alembic downgrade {request.target_version}", check=True)
# 提交事务
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
from sqlalchemy import text
def transfer_funds(sender_id: int, receiver_id: int, amount: float, db: Session):
# 设置事务隔离级别为SERIALIZABLE
db.execute(text("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"))
try:
# 检查发送方余额
sender = db.query(Account).filter(Account.id == sender_id).with_for_update().first()
if sender.balance < amount:
raise ValueError("Insufficient balance")
# 执行转账
sender.balance -= amount
receiver = db.query(Account).filter(Account.id == receiver_id).with_for_update().first()
receiver.balance += amount
db.commit()
except Exception as e:
db.rollback()
raise
评论
发表评论