INSERT, UPDATE, DELETE Statements
SQLAlchemy Core providesinsert(), update(), and delete() functions for data manipulation operations (DML).
INSERT Statements
Basic INSERT
from sqlalchemy import insert
# Single row insert
stmt = insert(users).values(
name='alice',
email='alice@example.com',
age=30
)
with engine.connect() as conn:
result = conn.execute(stmt)
conn.commit()
print(f"Inserted row id: {result.inserted_primary_key}")
Signature:
insert(table: TableClause) -> InsertCreates an INSERT statement for the specified table.Multiple Insertion Patterns
- values() Method
- Bulk Insert
- Parameter Binding
# Column names as kwargs
stmt = insert(users).values(
name='bob',
email='bob@example.com'
)
# Dictionary
stmt = insert(users).values({
'name': 'charlie',
'email': 'charlie@example.com'
})
# Column objects
stmt = insert(users).values({
users.c.name: 'diana',
users.c.email: 'diana@example.com'
})
# Execute with list of dicts
stmt = insert(users)
with engine.connect() as conn:
conn.execute(stmt, [
{'name': 'alice', 'email': 'alice@example.com'},
{'name': 'bob', 'email': 'bob@example.com'},
{'name': 'charlie', 'email': 'charlie@example.com'}
])
conn.commit()
# executemany() style
conn.execute(
insert(users),
[
{'name': f'user_{i}', 'email': f'user_{i}@example.com'}
for i in range(1000)
]
)
from sqlalchemy import bindparam
# Named parameters
stmt = insert(users).values(
name=bindparam('user_name'),
email=bindparam('user_email')
)
with engine.connect() as conn:
conn.execute(stmt, [
{'user_name': 'alice', 'user_email': 'alice@example.com'},
{'user_name': 'bob', 'user_email': 'bob@example.com'}
])
conn.commit()
INSERT with SELECT
# Insert from SELECT
select_stmt = select(archived_users.c.name, archived_users.c.email)
stmt = insert(users).from_select(
['name', 'email'],
select_stmt
)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
# With WHERE clause
select_stmt = (
select(temp_users.c.name, temp_users.c.email)
.where(temp_users.c.verified == True)
)
stmt = insert(users).from_select(['name', 'email'], select_stmt)
RETURNING Clause
# PostgreSQL, Oracle, SQL Server
stmt = (
insert(users)
.values(name='alice', email='alice@example.com')
.returning(users.c.id, users.c.created_at)
)
with engine.connect() as conn:
result = conn.execute(stmt)
row = result.fetchone()
print(f"New user ID: {row.id}, created: {row.created_at}")
conn.commit()
# Return all columns
stmt = insert(users).values(name='bob').returning(users)
The RETURNING clause is database-specific. PostgreSQL, Oracle, and SQL Server support it. MySQL and SQLite have limited or no support.
INSERT ON CONFLICT (Upsert)
from sqlalchemy.dialects.postgresql import insert
# INSERT ... ON CONFLICT DO NOTHING
stmt = insert(users).values(
id=1,
name='alice',
email='alice@example.com'
)
stmt = stmt.on_conflict_do_nothing(
index_elements=['email'] # Conflict target
)
# INSERT ... ON CONFLICT DO UPDATE
stmt = insert(users).values(
id=1,
name='alice',
email='alice@example.com',
login_count=1
)
stmt = stmt.on_conflict_do_update(
index_elements=['email'],
set_=dict(
name=stmt.excluded.name,
login_count=users.c.login_count + 1
)
)
# With WHERE clause
stmt = stmt.on_conflict_do_update(
index_elements=['email'],
set_=dict(name=stmt.excluded.name),
where=(users.c.updated_at < stmt.excluded.updated_at)
)
UPDATE Statements
Basic UPDATE
from sqlalchemy import update
# Update with WHERE
stmt = (
update(users)
.where(users.c.id == 1)
.values(name='Alice Updated', email='newemail@example.com')
)
with engine.connect() as conn:
result = conn.execute(stmt)
print(f"Updated {result.rowcount} rows")
conn.commit()
# Update multiple rows
stmt = (
update(users)
.where(users.c.status == 'pending')
.values(status='active')
)
Signature:
update(table: TableClause) -> UpdateCreates an UPDATE statement for the specified table.UPDATE with Expressions
# Increment value
stmt = (
update(users)
.where(users.c.id == 1)
.values(login_count=users.c.login_count + 1)
)
# Update with SQL functions
from sqlalchemy import func
stmt = (
update(users)
.where(users.c.last_login < '2024-01-01')
.values(
status='inactive',
updated_at=func.now()
)
)
# Conditional update using CASE
from sqlalchemy import case
stmt = (
update(users)
.values(
tier=case(
(users.c.points < 100, 'bronze'),
(users.c.points < 500, 'silver'),
else_='gold'
)
)
)
UPDATE with Subqueries
# Update from subquery
avg_score = (
select(func.avg(scores.c.value))
.where(scores.c.user_id == users.c.id)
.scalar_subquery()
)
stmt = (
update(users)
.values(avg_score=avg_score)
)
# Update based on joined data
max_order = (
select(func.max(orders.c.total))
.where(orders.c.user_id == users.c.id)
.scalar_subquery()
)
stmt = (
update(users)
.where(users.c.id.in_(
select(orders.c.user_id)
.where(orders.c.total > 1000)
))
.values(vip=True, max_order_amount=max_order)
)
UPDATE with FROM (PostgreSQL)
# PostgreSQL: UPDATE with FROM clause
stmt = (
update(users)
.where(users.c.id == orders.c.user_id)
.where(orders.c.total > 1000)
.values(vip=True)
)
# Explicit table in UPDATE FROM
from sqlalchemy.dialects.postgresql import update as pg_update
stmt = (
update(users)
.values(total_spent=orders.c.total)
.where(users.c.id == orders.c.user_id)
)
Bulk UPDATE
from sqlalchemy import bindparam
# Update multiple rows with different values
stmt = (
update(users)
.where(users.c.id == bindparam('user_id'))
.values(name=bindparam('user_name'))
)
with engine.connect() as conn:
conn.execute(stmt, [
{'user_id': 1, 'user_name': 'Alice'},
{'user_id': 2, 'user_name': 'Bob'},
{'user_id': 3, 'user_name': 'Charlie'}
])
conn.commit()
UPDATE with RETURNING
# PostgreSQL: Return updated values
stmt = (
update(users)
.where(users.c.status == 'pending')
.values(status='active', activated_at=func.now())
.returning(users.c.id, users.c.name, users.c.activated_at)
)
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(f"Activated user {row.id}: {row.name}")
conn.commit()
DELETE Statements
Basic DELETE
from sqlalchemy import delete
# Delete with WHERE
stmt = delete(users).where(users.c.id == 1)
with engine.connect() as conn:
result = conn.execute(stmt)
print(f"Deleted {result.rowcount} rows")
conn.commit()
# Delete multiple rows
stmt = delete(users).where(users.c.status == 'deleted')
# Delete all (use with caution!)
stmt = delete(users)
Signature:
delete(table: TableClause) -> DeleteCreates a DELETE statement for the specified table.DELETE with Complex Conditions
from sqlalchemy import and_, or_
# Multiple conditions
stmt = delete(users).where(
and_(
users.c.status == 'inactive',
users.c.last_login < '2023-01-01'
)
)
# OR conditions
stmt = delete(logs).where(
or_(
logs.c.level == 'debug',
logs.c.created_at < '2024-01-01'
)
)
DELETE with Subqueries
# Delete based on subquery
inactive_user_ids = (
select(users.c.id)
.where(users.c.last_login < '2023-01-01')
.scalar_subquery()
)
stmt = delete(sessions).where(
sessions.c.user_id.in_(inactive_user_ids)
)
# Delete with NOT EXISTS
from sqlalchemy import exists
has_orders = (
exists()
.where(orders.c.user_id == users.c.id)
)
stmt = delete(users).where(~has_orders)
DELETE with RETURNING
# PostgreSQL: Return deleted rows
stmt = (
delete(users)
.where(users.c.status == 'banned')
.returning(users.c.id, users.c.name, users.c.email)
)
with engine.connect() as conn:
result = conn.execute(stmt)
deleted_users = result.fetchall()
print(f"Deleted {len(deleted_users)} users")
for row in deleted_users:
print(f"Deleted: {row.name} ({row.email})")
conn.commit()
DELETE with JOIN (MySQL, PostgreSQL)
# PostgreSQL: DELETE with USING
stmt = (
delete(users)
.where(users.c.id == orders.c.user_id)
.where(orders.c.status == 'fraud')
)
# MySQL: DELETE with JOIN syntax
from sqlalchemy.dialects.mysql import delete as mysql_delete
stmt = (
delete(users)
.where(users.c.id == orders.c.user_id)
.where(orders.c.fraudulent == True)
)
Transaction Management
Explicit Transactions
# Manual commit
with engine.connect() as conn:
conn.execute(insert(users).values(name='alice'))
conn.execute(update(users).where(users.c.id == 1).values(status='active'))
conn.commit()
# Rollback on error
with engine.connect() as conn:
try:
conn.execute(insert(users).values(name='alice'))
conn.execute(insert(users).values(name='bob'))
conn.commit()
except Exception as e:
conn.rollback()
raise
begin() Context Manager
# Automatic commit/rollback
with engine.begin() as conn:
conn.execute(insert(users).values(name='alice'))
conn.execute(insert(orders).values(user_id=1, total=100))
# Automatically commits on success, rolls back on exception
# Nested transactions
with engine.begin() as conn:
conn.execute(insert(users).values(name='alice'))
# Savepoint
with conn.begin_nested():
conn.execute(insert(users).values(name='bob'))
# Can rollback to this savepoint independently
Performance Optimization
Bulk Operations
# Efficient bulk insert
with engine.begin() as conn:
conn.execute(
insert(users),
[
{'name': f'user_{i}', 'email': f'user_{i}@example.com'}
for i in range(10000)
]
)
# Bulk update with executemany
with engine.begin() as conn:
conn.execute(
update(users).where(users.c.id == bindparam('uid')),
[{'uid': i, 'status': 'active'} for i in range(1, 1001)]
)
Batch Processing
def batch_insert(conn, data, batch_size=1000):
"""Insert data in batches"""
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
conn.execute(insert(users), batch)
with engine.begin() as conn:
large_dataset = [{'name': f'user_{i}'} for i in range(100000)]
batch_insert(conn, large_dataset)
Compiled Statements
# Pre-compile for reuse
stmt = (
insert(users)
.values(name=bindparam('n'), email=bindparam('e'))
)
compiled = stmt.compile()
with engine.begin() as conn:
for i in range(1000):
conn.execute(compiled, {'n': f'user_{i}', 'e': f'user_{i}@example.com'})
Common Patterns
Soft Delete
# Soft delete pattern
stmt = (
update(users)
.where(users.c.id == 1)
.values(
deleted_at=func.now(),
status='deleted'
)
)
# Query excluding soft-deleted
stmt = select(users).where(users.c.deleted_at.is_(None))
Audit Trail
# Update with audit fields
stmt = (
update(users)
.where(users.c.id == 1)
.values(
name='Alice',
updated_at=func.now(),
updated_by='admin'
)
)
Conditional Insert
# Insert only if not exists
from sqlalchemy import exists
exists_stmt = (
exists()
.where(users.c.email == 'alice@example.com')
)
with engine.connect() as conn:
if not conn.scalar(select(exists_stmt)):
conn.execute(
insert(users).values(
name='alice',
email='alice@example.com'
)
)
conn.commit()
Next Steps
Schema Definition
Learn about tables and metadata
Connections
Master connection and execution patterns