DDL Operations
Data Definition Language (DDL) operations create, modify, and drop database schema objects. SQLAlchemy provides comprehensive DDL support through MetaData and DDL constructs.Creating Tables
Basic CREATE TABLE
from sqlalchemy import MetaData, Table, Column, Integer, String, create_engine
engine = create_engine('postgresql://user:pass@localhost/db')
metadata = MetaData()
users = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(100))
)
# Create single table
users.create(engine)
# Create with checkfirst (only if doesn't exist)
users.create(engine, checkfirst=True)
Create All Tables
# Create all tables in metadata
metadata.create_all(engine)
# Create only if they don't exist
metadata.create_all(engine, checkfirst=True)
# Create specific tables
metadata.create_all(engine, tables=[users, orders])
Signature:
MetaData.create_all(bind: Engine, tables: Optional[List[Table]] = None, checkfirst: bool = True)Creates all tables (or specified tables) defined in the MetaData.Generated SQL
# Preview DDL without executing
from sqlalchemy.schema import CreateTable
print(CreateTable(users).compile(engine))
# Output:
# CREATE TABLE users (
# id INTEGER NOT NULL,
# name VARCHAR(50),
# email VARCHAR(100),
# PRIMARY KEY (id)
# )
Dropping Tables
Basic DROP TABLE
# Drop single table
users.drop(engine)
# Drop with checkfirst
users.drop(engine, checkfirst=True)
# Drop all tables
metadata.drop_all(engine)
# Drop specific tables
metadata.drop_all(engine, tables=[users, orders])
drop_all() drops tables in reverse order of dependency to handle foreign keys correctly.DROP CASCADE
from sqlalchemy import DDL
# PostgreSQL: Drop with cascade
with engine.begin() as conn:
conn.execute(DDL('DROP TABLE IF EXISTS users CASCADE'))
# Or set on table
users.drop(engine, checkfirst=True)
DDL Events
before_create / after_create
from sqlalchemy import event, DDL
# Execute DDL before table creation
@event.listens_for(users, 'before_create')
def receive_before_create(target, connection, **kw):
print(f"Creating table {target.name}")
# Execute DDL after table creation
@event.listens_for(users, 'after_create')
def receive_after_create(target, connection, **kw):
connection.execute(DDL(
"CREATE INDEX idx_email ON users (email)"
))
users.create(engine)
DDL Execution
- Conditional DDL
- Callable DDL
from sqlalchemy import DDL
# Execute DDL on specific dialects
create_sequence = DDL(
'CREATE SEQUENCE user_id_seq',
).execute_if(dialect='postgresql')
event.listen(
users,
'before_create',
create_sequence
)
# Multiple dialect check
create_index = DDL(
'CREATE INDEX idx_name ON users (name)'
).execute_if(
dialect=('postgresql', 'mysql')
)
# Dynamic DDL generation
def create_partition_ddl(target, connection, **kw):
if connection.dialect.name == 'postgresql':
connection.execute(DDL(
f"CREATE TABLE {target.name}_2024 "
f"PARTITION OF {target.name} "
"FOR VALUES FROM ('2024-01-01') TO ('2025-01-01')"
))
event.listen(logs, 'after_create', create_partition_ddl)
Sequences
Creating Sequences
from sqlalchemy import Sequence
# Define sequence
user_id_seq = Sequence('user_id_seq', start=1000, increment=1)
users = Table(
'users',
metadata,
Column('id', Integer, user_id_seq, primary_key=True),
Column('name', String(50))
)
# Create sequence explicitly
user_id_seq.create(engine)
# Sequence is created automatically with table
metadata.create_all(engine)
Signature:
Sequence(name: str, start: int = 1, increment: int = 1, minvalue: Optional[int] = None, maxvalue: Optional[int] = None, nominvalue: bool = False, nomaxvalue: bool = False, cycle: bool = False, schema: Optional[str] = None, cache: Optional[int] = None)Represents a database sequence object.Using Sequences
from sqlalchemy import select, func
# Get next value
with engine.begin() as conn:
next_id = conn.scalar(user_id_seq.next_value())
print(f"Next ID: {next_id}")
# In INSERT
stmt = insert(users).values(
id=user_id_seq.next_value(),
name='alice'
)
Sequence Options
# PostgreSQL sequence with all options
seq = Sequence(
'order_id_seq',
start=1,
increment=1,
minvalue=1,
maxvalue=999999999,
cycle=False,
cache=20,
schema='public'
)
# Oracle sequence
seq = Sequence(
'emp_id_seq',
start=1,
increment=1,
order=True # Oracle-specific
)
Indexes
Creating Indexes
from sqlalchemy import Index
# Create index separately
idx = Index('ix_users_email', users.c.email)
idx.create(engine)
# Indexes created with table
users = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('email', String(100)),
Index('ix_email', 'email')
)
metadata.create_all(engine)
Dropping Indexes
# Drop index
idx.drop(engine)
# Drop with DDL
with engine.begin() as conn:
conn.execute(DDL('DROP INDEX IF EXISTS ix_users_email'))
ALTER TABLE Operations
Adding Columns
from sqlalchemy import DDL
# Add column
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users ADD COLUMN phone VARCHAR(20)'
))
# Add with default
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users ADD COLUMN is_active BOOLEAN DEFAULT true'
))
Modifying Columns
# Change column type
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users ALTER COLUMN age TYPE INTEGER'
))
# Set NOT NULL
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users ALTER COLUMN email SET NOT NULL'
))
# Drop NOT NULL
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users ALTER COLUMN phone DROP NOT NULL'
))
# Set default
with engine.begin() as conn:
conn.execute(DDL(
"ALTER TABLE users ALTER COLUMN status SET DEFAULT 'active'"
))
Renaming Objects
# Rename table
with engine.begin() as conn:
conn.execute(DDL('ALTER TABLE users RENAME TO customers'))
# Rename column (PostgreSQL)
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users RENAME COLUMN name TO full_name'
))
Adding/Dropping Constraints
# Add constraint
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users ADD CONSTRAINT ck_age CHECK (age >= 18)'
))
# Add foreign key
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE orders '
'ADD CONSTRAINT fk_user_id '
'FOREIGN KEY (user_id) REFERENCES users(id)'
))
# Drop constraint
with engine.begin() as conn:
conn.execute(DDL('ALTER TABLE users DROP CONSTRAINT ck_age'))
Custom DDL
DDL Class
from sqlalchemy import DDL
# Simple DDL
create_view = DDL("""
CREATE VIEW active_users AS
SELECT * FROM users WHERE status = 'active'
""")
with engine.begin() as conn:
conn.execute(create_view)
DDL with Parameters
# Parameterized DDL
create_function = DDL("""
CREATE FUNCTION get_user_count()
RETURNS INTEGER AS $$
BEGIN
RETURN (SELECT COUNT(*) FROM %(table)s);
END;
$$ LANGUAGE plpgsql;
""")
with engine.begin() as conn:
conn.execute(
create_function,
{'table': 'users'}
)
Database-Specific DDL
# PostgreSQL extensions
with engine.begin() as conn:
if conn.dialect.name == 'postgresql':
conn.execute(DDL('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"'))
conn.execute(DDL('CREATE EXTENSION IF NOT EXISTS "pg_trgm"'))
# MySQL storage engine
if engine.dialect.name == 'mysql':
users = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
mysql_engine='InnoDB',
mysql_charset='utf8mb4'
)
Schema Reflection
Reflecting Tables
from sqlalchemy import inspect
# Inspect database
inspector = inspect(engine)
# Get table names
table_names = inspector.get_table_names()
print(f"Tables: {table_names}")
# Reflect single table
users = Table('users', metadata, autoload_with=engine)
# Reflect all tables
metadata.reflect(bind=engine)
# Access reflected tables
for table_name in metadata.tables:
table = metadata.tables[table_name]
print(f"Table: {table.name}")
for col in table.c:
print(f" {col.name}: {col.type}")
Inspector Methods
inspector = inspect(engine)
# Get columns
columns = inspector.get_columns('users')
for col in columns:
print(f"{col['name']}: {col['type']}")
# Get primary keys
pk = inspector.get_pk_constraint('users')
print(f"Primary key: {pk['constrained_columns']}")
# Get foreign keys
fks = inspector.get_foreign_keys('orders')
for fk in fks:
print(f"FK: {fk['constrained_columns']} -> {fk['referred_table']}.{fk['referred_columns']}")
# Get indexes
indexes = inspector.get_indexes('users')
for idx in indexes:
print(f"Index: {idx['name']}, columns: {idx['column_names']}")
# Get unique constraints
uqs = inspector.get_unique_constraints('users')
# Get check constraints
cks = inspector.get_check_constraints('users')
# Get views
views = inspector.get_view_names()
Migration Strategies
Manual Migrations
def upgrade(engine):
"""Add email column to users table"""
with engine.begin() as conn:
# Check if column exists
inspector = inspect(conn)
columns = [c['name'] for c in inspector.get_columns('users')]
if 'email' not in columns:
conn.execute(DDL(
'ALTER TABLE users ADD COLUMN email VARCHAR(100)'
))
print("Added email column")
def downgrade(engine):
"""Remove email column from users table"""
with engine.begin() as conn:
conn.execute(DDL(
'ALTER TABLE users DROP COLUMN email'
))
print("Removed email column")
Version Tracking
schema_version = Table(
'schema_version',
metadata,
Column('version', Integer, primary_key=True),
Column('applied_at', DateTime, server_default=func.now())
)
def get_current_version(engine):
with engine.connect() as conn:
result = conn.execute(
select(func.max(schema_version.c.version))
)
return result.scalar() or 0
def apply_migration(engine, version, upgrade_func):
current = get_current_version(engine)
if current < version:
with engine.begin() as conn:
upgrade_func(conn)
conn.execute(
insert(schema_version).values(version=version)
)
Schema Comparison
from sqlalchemy import MetaData
from sqlalchemy.schema import CreateTable
def compare_schemas(engine, metadata):
"""Compare metadata schema with database"""
reflected = MetaData()
reflected.reflect(bind=engine)
# Tables in metadata but not in database
new_tables = set(metadata.tables.keys()) - set(reflected.tables.keys())
# Tables in database but not in metadata
removed_tables = set(reflected.tables.keys()) - set(metadata.tables.keys())
print(f"New tables: {new_tables}")
print(f"Removed tables: {removed_tables}")
# Compare columns for existing tables
for table_name in metadata.tables:
if table_name in reflected.tables:
meta_cols = set(metadata.tables[table_name].c.keys())
db_cols = set(reflected.tables[table_name].c.keys())
new_cols = meta_cols - db_cols
removed_cols = db_cols - meta_cols
if new_cols or removed_cols:
print(f"\nTable {table_name}:")
if new_cols:
print(f" New columns: {new_cols}")
if removed_cols:
print(f" Removed columns: {removed_cols}")
Common Patterns
Idempotent DDL
# Create only if not exists
with engine.begin() as conn:
conn.execute(DDL('CREATE TABLE IF NOT EXISTS users (...)'))
conn.execute(DDL('CREATE INDEX IF NOT EXISTS ix_email ON users (email)'))
# Drop only if exists
with engine.begin() as conn:
conn.execute(DDL('DROP TABLE IF EXISTS temp_data'))
conn.execute(DDL('DROP INDEX IF EXISTS ix_old_index'))
Transactional DDL
# PostgreSQL supports transactional DDL
with engine.begin() as conn:
try:
conn.execute(DDL('ALTER TABLE users ADD COLUMN age INTEGER'))
conn.execute(DDL('UPDATE users SET age = 0'))
conn.execute(DDL('ALTER TABLE users ALTER COLUMN age SET NOT NULL'))
except Exception as e:
# Automatically rolled back
print(f"Migration failed: {e}")
raise
Schema Versioning Table
migrations = Table(
'_migrations',
metadata,
Column('id', Integer, primary_key=True),
Column('version', String(50), unique=True, nullable=False),
Column('name', String(200)),
Column('applied_at', DateTime, server_default=func.now()),
Column('checksum', String(64))
)
Using Alembic
For production migrations, use Alembic:# Install Alembic
pip install alembic
# Initialize
alembic init migrations
# Create migration
alembic revision -m "add email column"
# Upgrade
alembic upgrade head
# Downgrade
alembic downgrade -1
Alembic is the recommended tool for database migrations in production SQLAlchemy applications.
Next Steps
Type System
Learn about SQLAlchemy data types
Engines
Configure database engines and connections