飞行中的引擎更换:生产环境数据库迁移的艺术与科学
title: 飞行中的引擎更换:生产环境数据库迁移的艺术与科学


[alembic]
# 禁止自动生成迁移(仅允许手动审核)
file_template = %%(year)d_%(month).2d_%(day).2d_%%(hour).2d%%(minute).2d-%%(slug)s
version_locations = migrations/versions
sqlalchemy.url = ${PRODUCTION_DB_URL} # 通过环境变量注入
[post_write_hooks]
# 自动生成变更校验脚本
hooks = pg_dump_verify
pg_dump_verify.executable = scripts/verify_changes.sh
# 创建新迁移(开发环境)
alembic revision -m "add_user_phone_column" --autogenerate
# 生成SQL预览
alembic upgrade head --sql > migration_script.sql
# 生产环境执行(需审核后)
alembic upgrade head
# versions/2023_07_20_1430-add_phone_column.py
def upgrade():
op.add_column('users',
sa.Column('phone',
sa.String(20),
nullable=True,
comment='用户联系电话',
server_default=text("''")
)
)
# 添加索引优化查询
op.create_index('ix_users_phone', 'users', ['phone'], unique=False)
def downgrade():
with op.batch_alter_table('users') as batch_op:
batch_op.drop_index('ix_users_phone')
batch_op.drop_column('phone')
# 蓝绿部署迁移示例
from fastapi import Depends
from sqlalchemy import text
async def migrate_user_data(conn=Depends(get_db)):
# 1. 创建新表
await conn.execute(text("""
CREATE TABLE new_users (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
phone VARCHAR(20)
)
"""))
# 2. 双写数据
await conn.execute(text("""
INSERT INTO new_users (id, name, phone)
SELECT id, name, phone FROM users
"""))
# 3. 原子切换(事务保障)
async with conn.begin():
await conn.execute(text("ALTER TABLE users RENAME TO old_users"))
await conn.execute(text("ALTER TABLE new_users RENAME TO users"))
# 迁移验证脚本
import pytest
from sqlalchemy import inspect
def test_migration_consistency():
inspector = inspect(engine)
# 验证表结构
assert 'phone' in inspector.get_columns('users')
# 验证索引
indexes = inspector.get_indexes('users')
assert any(idx['name'] == 'ix_users_phone' for idx in indexes)
# 验证数据总量
result = engine.execute("SELECT COUNT(*) FROM users")
assert result.scalar() > 0
ERROR [alembic.util.messaging] Can't locate revision identified by 'e3a1e3a1e3a1'
答案解析 正确答案:C
答案解析 正确答案:B+C
TimeoutError: QueuePool limit overflow
# 在env.py中配置连接池
context.configure(
connection=engine.connect(),
target_metadata=target_metadata,
transaction_per_migration=True, # 每个迁移独立事务
pool_pre_ping=True, # 自动重连
pool_size=5,
max_overflow=10
)
sa.exc.ProgrammingError: (psycopg2.errors.CannotCoerce)
cannot cast type integer to boolean
def upgrade():
# 分阶段变更类型
with op.batch_alter_table('settings') as batch_op:
batch_op.add_column(sa.Column('new_flag', sa.Boolean))
batch_op.execute("UPDATE settings SET new_flag = (old_flag != 0)")
batch_op.drop_column('old_flag')
batch_op.alter_column('new_flag', new_column_name='flag')
# 添加条件索引示例
op.create_index(
'idx_active_users',
'users',
['last_login'],
postgresql_where=text("status = 'active'")
)
评论
发表评论