Comprehensive Alembic database migration management for customer support systems
This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.
Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:
# Install Alembic with PostgreSQL support
pip install alembic psycopg2-binary sqlalchemy
# Or add to requirements.txt
alembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
# Initialize Alembic (creates alembic/ directory and alembic.ini)
alembic init alembic
# For multiple database support
alembic init --template multidb alembic
This creates:
alembic/: Directory containing migration scriptsalembic/versions/: Where individual migration files livealembic/env.py: Migration environment configurationalembic.ini: Alembic configuration fileEdit alembic.ini to set your database URL:
# For development
sqlalchemy.url = postgresql://user:password@localhost/support_dev
# For production (use environment variables)
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s
Better approach - use environment variables in env.py:
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Import your models
from myapp.models import Base
# This is the Alembic Config object
config = context.config
# Override sqlalchemy.url from environment
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)
# Set up target metadata for autogenerate
target_metadata = Base.metadata
Create a migration manually when you need precise control:
# Create empty migration file
alembic revision -m "add ticket priority column"
This generates a file like versions/abc123_add_ticket_priority_column.py:
"""add ticket priority column
Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add priority column to tickets table
op.add_column('tickets',
sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
)
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])
def downgrade() -> None:
# Remove index first
op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')
Let Alembic detect schema changes automatically:
# Generate migration by comparing models to database
alembic revision --autogenerate -m "add customer satisfaction table"
Important: Always review autogenerated migrations! They may miss:
Example autogenerated migration:
"""add customer satisfaction table
Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Auto-generated - review before running!
op.create_table(
'customer_satisfaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ticket_id', sa.Integer(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=False),
sa.Column('feedback', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
op.drop_table('customer_satisfaction')
When you need to transform existing data:
"""convert ticket status to new enum
Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data001'
down_revision = 'xyz789'
def upgrade() -> None:
# Create new status column
op.add_column('tickets',
sa.Column('status_new', sa.String(50), nullable=True)
)
# Migrate data using bulk update
tickets = table('tickets',
column('status', sa.String),
column('status_new', sa.String)
)
# Map old statuses to new ones
status_mapping = {
'open': 'OPEN',
'in_progress': 'IN_PROGRESS',
'pending': 'WAITING_ON_CUSTOMER',
'resolved': 'RESOLVED',
'closed': 'CLOSED'
}
connection = op.get_bind()
for old_status, new_status in status_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == old_status
).values(status_new=new_status)
)
# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)
# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')
def downgrade() -> None:
# Reverse the migration
op.add_column('tickets',
sa.Column('status_old', sa.String(50), nullable=True)
)
tickets = table('tickets',
column('status', sa.String),
column('status_old', sa.String)
)
# Reverse mapping
reverse_mapping = {
'OPEN': 'open',
'IN_PROGRESS': 'in_progress',
'WAITING_ON_CUSTOMER': 'pending',
'RESOLVED': 'resolved',
'CLOSED': 'closed'
}
connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == new_status
).values(status_old=old_status)
)
op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')
For large tables, process data in batches:
"""add computed resolution time to tickets
Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
revision = 'data002'
down_revision = 'data001'
def upgrade() -> None:
# Add new column
op.add_column('tickets',
sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
)
connection = op.get_bind()
tickets = table('tickets',
column('id', sa.Integer),
column('created_at', sa.DateTime),
column('resolved_at', sa.DateTime),
column('resolution_time_seconds', sa.Integer)
)
# Process in batches to avoid memory issues
batch_size = 1000
offset = 0
while True:
# Get batch of tickets that need processing
batch = connection.execute(
select(
tickets.c.id,
tickets.c.created_at,
tickets.c.resolved_at
).where(
sa.and_(
tickets.c.resolved_at.isnot(None),
tickets.c.resolution_time_seconds.is_(None)
)
).limit(batch_size).offset(offset)
).fetchall()
if not batch:
break
# Update batch
for row in batch:
if row.resolved_at and row.created_at:
resolution_time = (row.resolved_at - row.created_at).total_seconds()
connection.execute(
tickets.update().where(
tickets.c.id == row.id
).values(resolution_time_seconds=int(resolution_time))
)
offset += batch_size
# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
nullable=False, server_default='0')
def downgrade() -> None:
op.drop_column('tickets', 'resolution_time_seconds')
# Upgrade to latest revision (head)
alembic upgrade head
# See what would be executed (SQL only, don't run)
alembic upgrade head --sql
# Upgrade one step at a time
alembic upgrade +1
# Upgrade to specific revision
alembic upgrade abc123
# Downgrade one revision
alembic downgrade -1
# Downgrade to specific revision
alembic downgrade abc123
# Downgrade to base (empty database)
alembic downgrade base
# Generate SQL for downgrade without executing
alembic downgrade -1 --sql
# Show current database revision
alembic current
# Show current revision with details
alembic current --verbose
# Show migration history
alembic history
# Show history with current revision marked
alembic history --indicate-current
# Show specific revision range
alembic history -r base:head
In customer support systems, you might have:
# Create base for new branch
alembic revision -m "create reporting branch" \
--head=base \
--branch-label=reporting \
--version-path=alembic/versions/reporting
# Add migration to specific branch
alembic revision -m "add report tables" \
--head=reporting@head
Example branch structure:
base
├── main branch
│ ├── abc123: initial schema
│ ├── def456: add tickets
│ └── ghi789: add users
└── reporting branch
├── rep001: create reports table
└── rep002: add scheduled reports
# Show all branch heads
alembic heads
# Show branch points
alembic branches
# Upgrade specific branch
alembic upgrade reporting@head
# Upgrade all branches
alembic upgrade heads
When features are ready to merge:
# Merge two branches
alembic merge -m "merge reporting into main" \
main@head reporting@head
Generated merge migration:
"""merge reporting into main
Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'merge001'
down_revision = ('ghi789', 'rep002') # Multiple parents
branch_labels = None
depends_on = None
def upgrade() -> None:
# Usually empty for simple merges
# Add code if you need to reconcile conflicting changes
pass
def downgrade() -> None:
pass
When one branch depends on another:
# Create migration that depends on specific revision from another branch
alembic revision -m "reporting needs user table" \
--head=reporting@head \
--depends-on=def456 # Revision from main branch
# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def alembic_config():
"""Provide Alembic configuration for testing"""
config = Config("alembic.ini")
config.set_main_option(
"sqlalchemy.url",
"postgresql://localhost/support_test"
)
return config
@pytest.fixture
def test_db(alembic_config):
"""Create test database and apply migrations"""
# Create engine
engine = create_engine(
alembic_config.get_main_option("sqlalchemy.url")
)
# Run migrations to head
command.upgrade(alembic_config, "head")
yield engine
# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()
def test_migration_creates_tickets_table(test_db):
"""Test that migrations create expected tables"""
inspector = inspect(test_db)
tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables
def test_tickets_table_structure(test_db):
"""Test ticket table has correct columns"""
inspector = inspect(test_db)
columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns
# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str
def test_migration_upgrade_downgrade_cycle(alembic_config):
"""Test that upgrade -> downgrade -> upgrade works"""
# Start at base
command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade one step
command.downgrade(alembic_config, "-1")
# Upgrade back to head
command.upgrade(alembic_config, "head")
# Should complete without errors
def test_data_migration_preserves_data(test_db):
"""Test that data migrations don't lose data"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Insert test data
ticket = Ticket(
title="Test ticket",
status="OPEN",
priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()
# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")
# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()
# tests/test_migration_integration.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db):
"""Ensure all migrations are applied in test environment"""
script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
context = MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
script_heads = set(script.get_heads())
assert current_heads == script_heads, \
f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"
def test_migration_order_is_valid(alembic_config):
"""Verify migration chain has no gaps or conflicts"""
script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())
# Check each revision has valid down_revision
for revision in revisions:
if revision.down_revision is not None:
if isinstance(revision.down_revision, tuple):
# Merge point
for down_rev in revision.down_revision:
assert script.get_revision(down_rev) is not None
else:
assert script.get_revision(revision.down_revision) is not None
def test_check_command_detects_drift(alembic_config, test_db):
"""Test that check command detects schema drift"""
# This test verifies that `alembic check` works correctly
try:
command.check(alembic_config)
# If no exception, database matches models
assert True
except Exception as e:
# If exception, there's drift between DB and models
pytest.fail(f"Schema drift detected: {e}")
# tests/test_migration_performance.py
import time
import pytest
from alembic import command
def test_migration_completes_within_time_limit(alembic_config):
"""Ensure migrations complete within acceptable time"""
# Downgrade to base
command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start
# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"
@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
"""Test data migration performance with realistic data volume"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Create 10,000 test tickets
tickets = [
Ticket(
title=f"Test ticket {i}",
status="OPEN",
priority="normal"
)
for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()
# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002") # Specific data migration
duration = time.time() - start
# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"
# .github/workflows/migrations.yml