数据库事务回滚:FastAPI中的存档与读档大法
title: 数据库事务回滚:FastAPI中的存档与读档大法


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 创建数据库连接(使用SQLite示例)
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Pydantic模型
from pydantic import BaseModel
class UserCreate(BaseModel):
username: str
email: str
# 数据库模型
from sqlalchemy import Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
email = Column(String(100))
def create_user_with_profile(user_data: UserCreate):
db = SessionLocal()
try:
with db.begin():
# 创建用户
new_user = User(**user_data.dict())
db.add(new_user)
# 创建用户配置(假设需要原子操作)
db.execute("INSERT INTO profiles (user_id) VALUES (:user_id)",
{"user_id": new_user.id})
return new_user
except Exception as e:
db.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
db.close()
pip install alembic
alembic init migrations
sqlalchemy.url = sqlite:///./test.db
alembic revision -m "add phone_number column"
def upgrade():
op.add_column('users',
Column('phone_number', String(20))
)
def downgrade():
op.drop_column('users', 'phone_number')
# 生产环境迁移
ALEMBIC_CONFIG=prod_alembic.ini alembic upgrade head
# 回滚到指定版本
alembic downgrade -1
from fastapi import APIRouter, Depends
router = APIRouter()
@router.post("/register")
async def register_user(user: UserCreate):
db = SessionLocal()
try:
with db.begin():
# 检查用户名唯一性
if db.query(User).filter(User.username == user.username).first():
raise ValueError("Username already exists")
# 创建用户
new_user = User(**user.dict())
db.add(new_user)
# 创建关联记录
profile_data = {"user_id": new_user.id, "status": "active"}
db.execute(
"INSERT INTO profiles (user_id, status) VALUES (:user_id, :status)",
profile_data
)
return {"message": "Registration successful"}
except ValueError as ve:
db.rollback()
return {"error": str(ve)}
finally:
db.close()
评论
发表评论