Skip to main content

DDL Operations

Data Definition Language (DDL) operations create, modify, and drop database schema objects. SQLAlchemy provides comprehensive DDL support through MetaData and DDL constructs.

Creating Tables

Basic CREATE TABLE

from sqlalchemy import MetaData, Table, Column, Integer, String, create_engine

engine = create_engine('postgresql://user:pass@localhost/db')
metadata = MetaData()

users = Table(
    'users',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('email', String(100))
)

# Create single table
users.create(engine)

# Create with checkfirst (only if doesn't exist)
users.create(engine, checkfirst=True)

Create All Tables

# Create all tables in metadata
metadata.create_all(engine)

# Create only if they don't exist
metadata.create_all(engine, checkfirst=True)

# Create specific tables
metadata.create_all(engine, tables=[users, orders])
create_all
method
Signature: MetaData.create_all(bind: Engine, tables: Optional[List[Table]] = None, checkfirst: bool = True)Creates all tables (or specified tables) defined in the MetaData.

Generated SQL

# Preview DDL without executing
from sqlalchemy.schema import CreateTable

print(CreateTable(users).compile(engine))
# Output:
# CREATE TABLE users (
#     id INTEGER NOT NULL,
#     name VARCHAR(50),
#     email VARCHAR(100),
#     PRIMARY KEY (id)
# )

Dropping Tables

Basic DROP TABLE

# Drop single table
users.drop(engine)

# Drop with checkfirst
users.drop(engine, checkfirst=True)

# Drop all tables
metadata.drop_all(engine)

# Drop specific tables
metadata.drop_all(engine, tables=[users, orders])
drop_all() drops tables in reverse order of dependency to handle foreign keys correctly.

DROP CASCADE

from sqlalchemy import DDL

# PostgreSQL: Drop with cascade
with engine.begin() as conn:
    conn.execute(DDL('DROP TABLE IF EXISTS users CASCADE'))

# Or set on table
users.drop(engine, checkfirst=True)

DDL Events

before_create / after_create

from sqlalchemy import event, DDL

# Execute DDL before table creation
@event.listens_for(users, 'before_create')
def receive_before_create(target, connection, **kw):
    print(f"Creating table {target.name}")

# Execute DDL after table creation
@event.listens_for(users, 'after_create')
def receive_after_create(target, connection, **kw):
    connection.execute(DDL(
        "CREATE INDEX idx_email ON users (email)"
    ))

users.create(engine)

DDL Execution

from sqlalchemy import DDL

# Execute DDL on specific dialects
create_sequence = DDL(
    'CREATE SEQUENCE user_id_seq',
).execute_if(dialect='postgresql')

event.listen(
    users,
    'before_create',
    create_sequence
)

# Multiple dialect check
create_index = DDL(
    'CREATE INDEX idx_name ON users (name)'
).execute_if(
    dialect=('postgresql', 'mysql')
)

Sequences

Creating Sequences

from sqlalchemy import Sequence

# Define sequence
user_id_seq = Sequence('user_id_seq', start=1000, increment=1)

users = Table(
    'users',
    metadata,
    Column('id', Integer, user_id_seq, primary_key=True),
    Column('name', String(50))
)

# Create sequence explicitly
user_id_seq.create(engine)

# Sequence is created automatically with table
metadata.create_all(engine)
Sequence
class
Signature: Sequence(name: str, start: int = 1, increment: int = 1, minvalue: Optional[int] = None, maxvalue: Optional[int] = None, nominvalue: bool = False, nomaxvalue: bool = False, cycle: bool = False, schema: Optional[str] = None, cache: Optional[int] = None)Represents a database sequence object.

Using Sequences

from sqlalchemy import select, func

# Get next value
with engine.begin() as conn:
    next_id = conn.scalar(user_id_seq.next_value())
    print(f"Next ID: {next_id}")

# In INSERT
stmt = insert(users).values(
    id=user_id_seq.next_value(),
    name='alice'
)

Sequence Options

# PostgreSQL sequence with all options
seq = Sequence(
    'order_id_seq',
    start=1,
    increment=1,
    minvalue=1,
    maxvalue=999999999,
    cycle=False,
    cache=20,
    schema='public'
)

# Oracle sequence
seq = Sequence(
    'emp_id_seq',
    start=1,
    increment=1,
    order=True  # Oracle-specific
)

Indexes

Creating Indexes

from sqlalchemy import Index

# Create index separately
idx = Index('ix_users_email', users.c.email)
idx.create(engine)

# Indexes created with table
users = Table(
    'users',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('email', String(100)),
    Index('ix_email', 'email')
)

metadata.create_all(engine)

Dropping Indexes

# Drop index
idx.drop(engine)

# Drop with DDL
with engine.begin() as conn:
    conn.execute(DDL('DROP INDEX IF EXISTS ix_users_email'))

ALTER TABLE Operations

Adding Columns

from sqlalchemy import DDL

# Add column
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users ADD COLUMN phone VARCHAR(20)'
    ))

# Add with default
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users ADD COLUMN is_active BOOLEAN DEFAULT true'
    ))

Modifying Columns

# Change column type
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users ALTER COLUMN age TYPE INTEGER'
    ))

# Set NOT NULL
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users ALTER COLUMN email SET NOT NULL'
    ))

# Drop NOT NULL
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users ALTER COLUMN phone DROP NOT NULL'
    ))

# Set default
with engine.begin() as conn:
    conn.execute(DDL(
        "ALTER TABLE users ALTER COLUMN status SET DEFAULT 'active'"
    ))

Renaming Objects

# Rename table
with engine.begin() as conn:
    conn.execute(DDL('ALTER TABLE users RENAME TO customers'))

# Rename column (PostgreSQL)
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users RENAME COLUMN name TO full_name'
    ))

Adding/Dropping Constraints

# Add constraint
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE users ADD CONSTRAINT ck_age CHECK (age >= 18)'
    ))

# Add foreign key
with engine.begin() as conn:
    conn.execute(DDL(
        'ALTER TABLE orders '
        'ADD CONSTRAINT fk_user_id '
        'FOREIGN KEY (user_id) REFERENCES users(id)'
    ))

# Drop constraint
with engine.begin() as conn:
    conn.execute(DDL('ALTER TABLE users DROP CONSTRAINT ck_age'))

Custom DDL

DDL Class

from sqlalchemy import DDL

# Simple DDL
create_view = DDL("""
    CREATE VIEW active_users AS
    SELECT * FROM users WHERE status = 'active'
""")

with engine.begin() as conn:
    conn.execute(create_view)

DDL with Parameters

# Parameterized DDL
create_function = DDL("""
    CREATE FUNCTION get_user_count()
    RETURNS INTEGER AS $$
    BEGIN
        RETURN (SELECT COUNT(*) FROM %(table)s);
    END;
    $$ LANGUAGE plpgsql;
""")

with engine.begin() as conn:
    conn.execute(
        create_function,
        {'table': 'users'}
    )

Database-Specific DDL

# PostgreSQL extensions
with engine.begin() as conn:
    if conn.dialect.name == 'postgresql':
        conn.execute(DDL('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"'))
        conn.execute(DDL('CREATE EXTENSION IF NOT EXISTS "pg_trgm"'))

# MySQL storage engine
if engine.dialect.name == 'mysql':
    users = Table(
        'users',
        metadata,
        Column('id', Integer, primary_key=True),
        mysql_engine='InnoDB',
        mysql_charset='utf8mb4'
    )

Schema Reflection

Reflecting Tables

from sqlalchemy import inspect

# Inspect database
inspector = inspect(engine)

# Get table names
table_names = inspector.get_table_names()
print(f"Tables: {table_names}")

# Reflect single table
users = Table('users', metadata, autoload_with=engine)

# Reflect all tables
metadata.reflect(bind=engine)

# Access reflected tables
for table_name in metadata.tables:
    table = metadata.tables[table_name]
    print(f"Table: {table.name}")
    for col in table.c:
        print(f"  {col.name}: {col.type}")

Inspector Methods

inspector = inspect(engine)

# Get columns
columns = inspector.get_columns('users')
for col in columns:
    print(f"{col['name']}: {col['type']}")

# Get primary keys
pk = inspector.get_pk_constraint('users')
print(f"Primary key: {pk['constrained_columns']}")

# Get foreign keys
fks = inspector.get_foreign_keys('orders')
for fk in fks:
    print(f"FK: {fk['constrained_columns']} -> {fk['referred_table']}.{fk['referred_columns']}")

# Get indexes
indexes = inspector.get_indexes('users')
for idx in indexes:
    print(f"Index: {idx['name']}, columns: {idx['column_names']}")

# Get unique constraints
uqs = inspector.get_unique_constraints('users')

# Get check constraints
cks = inspector.get_check_constraints('users')

# Get views
views = inspector.get_view_names()

Migration Strategies

Manual Migrations

def upgrade(engine):
    """Add email column to users table"""
    with engine.begin() as conn:
        # Check if column exists
        inspector = inspect(conn)
        columns = [c['name'] for c in inspector.get_columns('users')]
        
        if 'email' not in columns:
            conn.execute(DDL(
                'ALTER TABLE users ADD COLUMN email VARCHAR(100)'
            ))
            print("Added email column")

def downgrade(engine):
    """Remove email column from users table"""
    with engine.begin() as conn:
        conn.execute(DDL(
            'ALTER TABLE users DROP COLUMN email'
        ))
        print("Removed email column")

Version Tracking

schema_version = Table(
    'schema_version',
    metadata,
    Column('version', Integer, primary_key=True),
    Column('applied_at', DateTime, server_default=func.now())
)

def get_current_version(engine):
    with engine.connect() as conn:
        result = conn.execute(
            select(func.max(schema_version.c.version))
        )
        return result.scalar() or 0

def apply_migration(engine, version, upgrade_func):
    current = get_current_version(engine)
    if current < version:
        with engine.begin() as conn:
            upgrade_func(conn)
            conn.execute(
                insert(schema_version).values(version=version)
            )

Schema Comparison

from sqlalchemy import MetaData
from sqlalchemy.schema import CreateTable

def compare_schemas(engine, metadata):
    """Compare metadata schema with database"""
    reflected = MetaData()
    reflected.reflect(bind=engine)
    
    # Tables in metadata but not in database
    new_tables = set(metadata.tables.keys()) - set(reflected.tables.keys())
    
    # Tables in database but not in metadata
    removed_tables = set(reflected.tables.keys()) - set(metadata.tables.keys())
    
    print(f"New tables: {new_tables}")
    print(f"Removed tables: {removed_tables}")
    
    # Compare columns for existing tables
    for table_name in metadata.tables:
        if table_name in reflected.tables:
            meta_cols = set(metadata.tables[table_name].c.keys())
            db_cols = set(reflected.tables[table_name].c.keys())
            
            new_cols = meta_cols - db_cols
            removed_cols = db_cols - meta_cols
            
            if new_cols or removed_cols:
                print(f"\nTable {table_name}:")
                if new_cols:
                    print(f"  New columns: {new_cols}")
                if removed_cols:
                    print(f"  Removed columns: {removed_cols}")

Common Patterns

Idempotent DDL

# Create only if not exists
with engine.begin() as conn:
    conn.execute(DDL('CREATE TABLE IF NOT EXISTS users (...)'))
    conn.execute(DDL('CREATE INDEX IF NOT EXISTS ix_email ON users (email)'))

# Drop only if exists
with engine.begin() as conn:
    conn.execute(DDL('DROP TABLE IF EXISTS temp_data'))
    conn.execute(DDL('DROP INDEX IF EXISTS ix_old_index'))

Transactional DDL

# PostgreSQL supports transactional DDL
with engine.begin() as conn:
    try:
        conn.execute(DDL('ALTER TABLE users ADD COLUMN age INTEGER'))
        conn.execute(DDL('UPDATE users SET age = 0'))
        conn.execute(DDL('ALTER TABLE users ALTER COLUMN age SET NOT NULL'))
    except Exception as e:
        # Automatically rolled back
        print(f"Migration failed: {e}")
        raise

Schema Versioning Table

migrations = Table(
    '_migrations',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('version', String(50), unique=True, nullable=False),
    Column('name', String(200)),
    Column('applied_at', DateTime, server_default=func.now()),
    Column('checksum', String(64))
)

Using Alembic

For production migrations, use Alembic:
# Install Alembic
pip install alembic

# Initialize
alembic init migrations

# Create migration
alembic revision -m "add email column"

# Upgrade
alembic upgrade head

# Downgrade
alembic downgrade -1
Alembic is the recommended tool for database migrations in production SQLAlchemy applications.

Next Steps

Type System

Learn about SQLAlchemy data types

Engines

Configure database engines and connections