SELECT Statements
Theselect() function is the primary way to construct SELECT queries in SQLAlchemy Core. It provides a fluent API for building complex queries.
Basic SELECT
Simple Queries
from sqlalchemy import select
# SELECT all columns
stmt = select(users)
# SELECT users.id, users.name, users.email FROM users
# SELECT specific columns
stmt = select(users.c.id, users.c.name)
# SELECT users.id, users.name FROM users
# SELECT with expressions
stmt = select(users.c.name, func.length(users.c.name).label('name_length'))
Signature:
select(*entities) -> SelectCreates a SELECT statement. Accepts table objects, columns, or any column expressions.WHERE Clause
# Single condition
stmt = select(users).where(users.c.name == 'alice')
# Multiple conditions (AND)
stmt = select(users).where(
users.c.age >= 18,
users.c.status == 'active'
)
# Same as:
stmt = select(users).where(users.c.age >= 18).where(users.c.status == 'active')
# OR conditions
from sqlalchemy import or_
stmt = select(users).where(
or_(
users.c.role == 'admin',
users.c.role == 'moderator'
)
)
ORDER BY, LIMIT, OFFSET
from sqlalchemy import desc
# ORDER BY
stmt = select(users).order_by(users.c.name)
stmt = select(users).order_by(desc(users.c.created_at))
# Multiple ORDER BY
stmt = select(users).order_by(
users.c.last_name,
users.c.first_name
)
# LIMIT and OFFSET
stmt = select(users).limit(10)
stmt = select(users).offset(20)
# Pagination
page = 3
per_page = 10
stmt = select(users).limit(per_page).offset((page - 1) * per_page)
JOIN Operations
Inner Joins
- Explicit JOIN
- join() Function
# Using join() method
stmt = (
select(users.c.name, orders.c.total)
.select_from(users)
.join(orders, users.c.id == orders.c.user_id)
)
# SELECT users.name, orders.total
# FROM users JOIN orders ON users.id = orders.user_id
# Implicit foreign key join
stmt = (
select(users.c.name, orders.c.total)
.select_from(users)
.join(orders) # Automatically uses foreign key
)
from sqlalchemy import join
# Create join object
user_orders = join(users, orders, users.c.id == orders.c.user_id)
# Use in SELECT
stmt = select(users.c.name, orders.c.total).select_from(user_orders)
# Multiple joins
j = (
join(users, orders)
.join(products, orders.c.product_id == products.c.id)
)
stmt = select(users.c.name, products.c.name).select_from(j)
Outer Joins
# LEFT OUTER JOIN
stmt = (
select(users.c.name, func.count(orders.c.id))
.select_from(users)
.outerjoin(orders, users.c.id == orders.c.user_id)
.group_by(users.c.name)
)
# SELECT users.name, count(orders.id)
# FROM users LEFT OUTER JOIN orders ON users.id = orders.user_id
# GROUP BY users.name
# Using outerjoin() function
from sqlalchemy import outerjoin
stmt = (
select(users.c.name, orders.c.total)
.select_from(outerjoin(users, orders))
)
# RIGHT OUTER JOIN (some databases)
stmt = (
select(users.c.name, orders.c.total)
.select_from(orders)
.outerjoin(users, orders.c.user_id == users.c.id, full=False)
)
Complex Join Conditions
# Multiple conditions in join
stmt = (
select(users.c.name, orders.c.total)
.select_from(users)
.join(
orders,
and_(
users.c.id == orders.c.user_id,
orders.c.status == 'completed',
orders.c.created_at >= '2024-01-01'
)
)
)
# Self join
managers = users.alias('managers')
stmt = (
select(users.c.name, managers.c.name.label('manager_name'))
.select_from(users)
.join(managers, users.c.manager_id == managers.c.id)
)
Subqueries
Scalar Subqueries
# Subquery in SELECT
order_count = (
select(func.count(orders.c.id))
.where(orders.c.user_id == users.c.id)
.scalar_subquery()
)
stmt = select(
users.c.name,
order_count.label('order_count')
)
# SELECT users.name,
# (SELECT count(orders.id) FROM orders WHERE orders.user_id = users.id) AS order_count
# FROM users
Use
.scalar_subquery() to create a scalar subquery that returns a single value.Subqueries in FROM
# Create subquery
subq = (
select(
orders.c.user_id,
func.sum(orders.c.total).label('total_spent')
)
.group_by(orders.c.user_id)
.subquery()
)
# Use in main query
stmt = (
select(users.c.name, subq.c.total_spent)
.select_from(users)
.join(subq, users.c.id == subq.c.user_id)
.where(subq.c.total_spent > 1000)
)
# SELECT users.name, anon_1.total_spent
# FROM users JOIN (
# SELECT orders.user_id, sum(orders.total) AS total_spent
# FROM orders GROUP BY orders.user_id
# ) AS anon_1 ON users.id = anon_1.user_id
# WHERE anon_1.total_spent > 1000
Correlated Subqueries
# Correlated subquery with EXISTS
from sqlalchemy import exists
has_orders = (
exists()
.where(orders.c.user_id == users.c.id)
.where(orders.c.total > 100)
)
stmt = select(users).where(has_orders)
# SELECT users.* FROM users
# WHERE EXISTS (
# SELECT * FROM orders
# WHERE orders.user_id = users.id AND orders.total > 100
# )
# Using select().exists()
has_orders = (
select(orders.c.id)
.where(orders.c.user_id == users.c.id)
.where(orders.c.total > 100)
.exists()
)
stmt = select(users).where(has_orders)
IN with Subquery
# Subquery in WHERE IN
active_user_ids = (
select(users.c.id)
.where(users.c.status == 'active')
.scalar_subquery()
)
stmt = (
select(orders)
.where(orders.c.user_id.in_(active_user_ids))
)
Common Table Expressions (CTEs)
Basic CTE
from sqlalchemy import select
# Define CTE
active_users_cte = (
select(users.c.id, users.c.name)
.where(users.c.status == 'active')
.cte('active_users')
)
# Use CTE in main query
stmt = (
select(active_users_cte.c.name, func.count(orders.c.id))
.select_from(active_users_cte)
.join(orders, active_users_cte.c.id == orders.c.user_id)
.group_by(active_users_cte.c.name)
)
# WITH active_users AS (
# SELECT users.id, users.name FROM users WHERE users.status = 'active'
# )
# SELECT active_users.name, count(orders.id)
# FROM active_users JOIN orders ON active_users.id = orders.user_id
# GROUP BY active_users.name
Signature:
.cte(name: Optional[str] = None, recursive: bool = False) -> CTECreates a Common Table Expression from a SELECT statement.Multiple CTEs
# First CTE
active_users = (
select(users)
.where(users.c.status == 'active')
.cte('active_users')
)
# Second CTE using first
user_totals = (
select(
active_users.c.id,
func.sum(orders.c.total).label('total')
)
.select_from(active_users)
.join(orders, active_users.c.id == orders.c.user_id)
.group_by(active_users.c.id)
.cte('user_totals')
)
# Main query using both
stmt = (
select(active_users.c.name, user_totals.c.total)
.select_from(active_users)
.join(user_totals, active_users.c.id == user_totals.c.id)
)
Recursive CTEs
# Recursive CTE for hierarchical data
included_parts = (
select(
parts.c.id,
parts.c.name,
parts.c.parent_id
)
.where(parts.c.name == 'root')
.cte('included_parts', recursive=True)
)
# Recursive term
included_parts = included_parts.union_all(
select(
parts.c.id,
parts.c.name,
parts.c.parent_id
)
.select_from(parts)
.join(included_parts, parts.c.parent_id == included_parts.c.id)
)
stmt = select(included_parts)
# WITH RECURSIVE included_parts AS (
# SELECT parts.id, parts.name, parts.parent_id
# FROM parts WHERE parts.name = 'root'
# UNION ALL
# SELECT parts.id, parts.name, parts.parent_id
# FROM parts JOIN included_parts ON parts.parent_id = included_parts.id
# )
# SELECT * FROM included_parts
Aggregation and Grouping
GROUP BY and HAVING
from sqlalchemy import func
# Simple GROUP BY
stmt = (
select(
users.c.country,
func.count(users.c.id).label('user_count')
)
.group_by(users.c.country)
)
# GROUP BY with HAVING
stmt = (
select(
users.c.country,
func.count(users.c.id).label('user_count')
)
.group_by(users.c.country)
.having(func.count(users.c.id) > 10)
)
# Multiple grouping columns
stmt = (
select(
orders.c.user_id,
orders.c.status,
func.sum(orders.c.total).label('total')
)
.group_by(orders.c.user_id, orders.c.status)
)
Window Functions
# ROW_NUMBER() with PARTITION BY
stmt = select(
orders.c.user_id,
orders.c.total,
func.row_number().over(
partition_by=orders.c.user_id,
order_by=desc(orders.c.total)
).label('rank_in_user')
).order_by(orders.c.user_id, 'rank_in_user')
# Running totals
stmt = select(
sales.c.date,
sales.c.amount,
func.sum(sales.c.amount).over(
order_by=sales.c.date,
rows=(None, 0) # UNBOUNDED PRECEDING to CURRENT ROW
).label('running_total')
)
Set Operations
UNION and UNION ALL
from sqlalchemy import union, union_all
# UNION (removes duplicates)
active = select(users).where(users.c.status == 'active')
premium = select(users).where(users.c.premium == True)
stmt = union(active, premium)
# SELECT users.* FROM users WHERE users.status = 'active'
# UNION
# SELECT users.* FROM users WHERE users.premium = true
# UNION ALL (keeps duplicates)
stmt = union_all(active, premium)
# Method syntax
stmt = active.union(premium)
stmt = active.union_all(premium)
INTERSECT and EXCEPT
from sqlalchemy import intersect, except_
# INTERSECT
stmt = intersect(
select(users.c.id).where(users.c.status == 'active'),
select(orders.c.user_id).where(orders.c.total > 100)
)
# EXCEPT (difference)
stmt = except_(
select(users.c.id),
select(banned_users.c.user_id)
)
Advanced Patterns
Lateral Joins (PostgreSQL)
from sqlalchemy import lateral
# Latest order per user
latest_order = (
select(orders)
.where(orders.c.user_id == users.c.id)
.order_by(desc(orders.c.created_at))
.limit(1)
.lateral()
)
stmt = (
select(users.c.name, latest_order.c.total)
.select_from(users)
.join(latest_order, True) # LATERAL join
)
Table Sampling
from sqlalchemy import tablesample
# Random sample (PostgreSQL)
stmt = select(users).select_from(
tablesample(users, 10, seed=12345) # 10% sample
)
Dynamic Column Selection
def build_select(table, columns=None, filters=None):
# Select specific columns or all
if columns:
cols = [getattr(table.c, col) for col in columns]
stmt = select(*cols)
else:
stmt = select(table)
# Apply filters
if filters:
for column, value in filters.items():
stmt = stmt.where(getattr(table.c, column) == value)
return stmt
# Usage
stmt = build_select(users, columns=['id', 'name'], filters={'status': 'active'})
Executing SELECT Statements
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/db')
stmt = select(users).where(users.c.status == 'active')
# Execute and fetch results
with engine.connect() as conn:
result = conn.execute(stmt)
# Fetch all rows
rows = result.fetchall()
for row in rows:
print(row.id, row.name)
# Fetch one row
result = conn.execute(stmt)
row = result.fetchone()
# Iterate results
result = conn.execute(stmt)
for row in result:
print(row)
Next Steps
INSERT/UPDATE/DELETE
Learn data manipulation statements
Connections
Execute statements and handle results