Skip to main content

SELECT Statements

The select() function is the primary way to construct SELECT queries in SQLAlchemy Core. It provides a fluent API for building complex queries.

Basic SELECT

Simple Queries

from sqlalchemy import select

# SELECT all columns
stmt = select(users)
# SELECT users.id, users.name, users.email FROM users

# SELECT specific columns
stmt = select(users.c.id, users.c.name)
# SELECT users.id, users.name FROM users

# SELECT with expressions
stmt = select(users.c.name, func.length(users.c.name).label('name_length'))
select
function
Signature: select(*entities) -> SelectCreates a SELECT statement. Accepts table objects, columns, or any column expressions.

WHERE Clause

# Single condition
stmt = select(users).where(users.c.name == 'alice')

# Multiple conditions (AND)
stmt = select(users).where(
    users.c.age >= 18,
    users.c.status == 'active'
)
# Same as:
stmt = select(users).where(users.c.age >= 18).where(users.c.status == 'active')

# OR conditions
from sqlalchemy import or_

stmt = select(users).where(
    or_(
        users.c.role == 'admin',
        users.c.role == 'moderator'
    )
)

ORDER BY, LIMIT, OFFSET

from sqlalchemy import desc

# ORDER BY
stmt = select(users).order_by(users.c.name)
stmt = select(users).order_by(desc(users.c.created_at))

# Multiple ORDER BY
stmt = select(users).order_by(
    users.c.last_name,
    users.c.first_name
)

# LIMIT and OFFSET
stmt = select(users).limit(10)
stmt = select(users).offset(20)

# Pagination
page = 3
per_page = 10
stmt = select(users).limit(per_page).offset((page - 1) * per_page)

JOIN Operations

Inner Joins

# Using join() method
stmt = (
    select(users.c.name, orders.c.total)
    .select_from(users)
    .join(orders, users.c.id == orders.c.user_id)
)
# SELECT users.name, orders.total 
# FROM users JOIN orders ON users.id = orders.user_id

# Implicit foreign key join
stmt = (
    select(users.c.name, orders.c.total)
    .select_from(users)
    .join(orders)  # Automatically uses foreign key
)

Outer Joins

# LEFT OUTER JOIN
stmt = (
    select(users.c.name, func.count(orders.c.id))
    .select_from(users)
    .outerjoin(orders, users.c.id == orders.c.user_id)
    .group_by(users.c.name)
)
# SELECT users.name, count(orders.id)
# FROM users LEFT OUTER JOIN orders ON users.id = orders.user_id
# GROUP BY users.name

# Using outerjoin() function
from sqlalchemy import outerjoin

stmt = (
    select(users.c.name, orders.c.total)
    .select_from(outerjoin(users, orders))
)

# RIGHT OUTER JOIN (some databases)
stmt = (
    select(users.c.name, orders.c.total)
    .select_from(orders)
    .outerjoin(users, orders.c.user_id == users.c.id, full=False)
)

Complex Join Conditions

# Multiple conditions in join
stmt = (
    select(users.c.name, orders.c.total)
    .select_from(users)
    .join(
        orders,
        and_(
            users.c.id == orders.c.user_id,
            orders.c.status == 'completed',
            orders.c.created_at >= '2024-01-01'
        )
    )
)

# Self join
managers = users.alias('managers')
stmt = (
    select(users.c.name, managers.c.name.label('manager_name'))
    .select_from(users)
    .join(managers, users.c.manager_id == managers.c.id)
)

Subqueries

Scalar Subqueries

# Subquery in SELECT
order_count = (
    select(func.count(orders.c.id))
    .where(orders.c.user_id == users.c.id)
    .scalar_subquery()
)

stmt = select(
    users.c.name,
    order_count.label('order_count')
)
# SELECT users.name, 
#        (SELECT count(orders.id) FROM orders WHERE orders.user_id = users.id) AS order_count
# FROM users
Use .scalar_subquery() to create a scalar subquery that returns a single value.

Subqueries in FROM

# Create subquery
subq = (
    select(
        orders.c.user_id,
        func.sum(orders.c.total).label('total_spent')
    )
    .group_by(orders.c.user_id)
    .subquery()
)

# Use in main query
stmt = (
    select(users.c.name, subq.c.total_spent)
    .select_from(users)
    .join(subq, users.c.id == subq.c.user_id)
    .where(subq.c.total_spent > 1000)
)
# SELECT users.name, anon_1.total_spent
# FROM users JOIN (
#     SELECT orders.user_id, sum(orders.total) AS total_spent
#     FROM orders GROUP BY orders.user_id
# ) AS anon_1 ON users.id = anon_1.user_id
# WHERE anon_1.total_spent > 1000

Correlated Subqueries

# Correlated subquery with EXISTS
from sqlalchemy import exists

has_orders = (
    exists()
    .where(orders.c.user_id == users.c.id)
    .where(orders.c.total > 100)
)

stmt = select(users).where(has_orders)
# SELECT users.* FROM users
# WHERE EXISTS (
#     SELECT * FROM orders 
#     WHERE orders.user_id = users.id AND orders.total > 100
# )

# Using select().exists()
has_orders = (
    select(orders.c.id)
    .where(orders.c.user_id == users.c.id)
    .where(orders.c.total > 100)
    .exists()
)

stmt = select(users).where(has_orders)

IN with Subquery

# Subquery in WHERE IN
active_user_ids = (
    select(users.c.id)
    .where(users.c.status == 'active')
    .scalar_subquery()
)

stmt = (
    select(orders)
    .where(orders.c.user_id.in_(active_user_ids))
)

Common Table Expressions (CTEs)

Basic CTE

from sqlalchemy import select

# Define CTE
active_users_cte = (
    select(users.c.id, users.c.name)
    .where(users.c.status == 'active')
    .cte('active_users')
)

# Use CTE in main query
stmt = (
    select(active_users_cte.c.name, func.count(orders.c.id))
    .select_from(active_users_cte)
    .join(orders, active_users_cte.c.id == orders.c.user_id)
    .group_by(active_users_cte.c.name)
)
# WITH active_users AS (
#     SELECT users.id, users.name FROM users WHERE users.status = 'active'
# )
# SELECT active_users.name, count(orders.id)
# FROM active_users JOIN orders ON active_users.id = orders.user_id
# GROUP BY active_users.name
cte
method
Signature: .cte(name: Optional[str] = None, recursive: bool = False) -> CTECreates a Common Table Expression from a SELECT statement.

Multiple CTEs

# First CTE
active_users = (
    select(users)
    .where(users.c.status == 'active')
    .cte('active_users')
)

# Second CTE using first
user_totals = (
    select(
        active_users.c.id,
        func.sum(orders.c.total).label('total')
    )
    .select_from(active_users)
    .join(orders, active_users.c.id == orders.c.user_id)
    .group_by(active_users.c.id)
    .cte('user_totals')
)

# Main query using both
stmt = (
    select(active_users.c.name, user_totals.c.total)
    .select_from(active_users)
    .join(user_totals, active_users.c.id == user_totals.c.id)
)

Recursive CTEs

# Recursive CTE for hierarchical data
included_parts = (
    select(
        parts.c.id,
        parts.c.name,
        parts.c.parent_id
    )
    .where(parts.c.name == 'root')
    .cte('included_parts', recursive=True)
)

# Recursive term
included_parts = included_parts.union_all(
    select(
        parts.c.id,
        parts.c.name,
        parts.c.parent_id
    )
    .select_from(parts)
    .join(included_parts, parts.c.parent_id == included_parts.c.id)
)

stmt = select(included_parts)
# WITH RECURSIVE included_parts AS (
#     SELECT parts.id, parts.name, parts.parent_id 
#     FROM parts WHERE parts.name = 'root'
#     UNION ALL
#     SELECT parts.id, parts.name, parts.parent_id
#     FROM parts JOIN included_parts ON parts.parent_id = included_parts.id
# )
# SELECT * FROM included_parts

Aggregation and Grouping

GROUP BY and HAVING

from sqlalchemy import func

# Simple GROUP BY
stmt = (
    select(
        users.c.country,
        func.count(users.c.id).label('user_count')
    )
    .group_by(users.c.country)
)

# GROUP BY with HAVING
stmt = (
    select(
        users.c.country,
        func.count(users.c.id).label('user_count')
    )
    .group_by(users.c.country)
    .having(func.count(users.c.id) > 10)
)

# Multiple grouping columns
stmt = (
    select(
        orders.c.user_id,
        orders.c.status,
        func.sum(orders.c.total).label('total')
    )
    .group_by(orders.c.user_id, orders.c.status)
)

Window Functions

# ROW_NUMBER() with PARTITION BY
stmt = select(
    orders.c.user_id,
    orders.c.total,
    func.row_number().over(
        partition_by=orders.c.user_id,
        order_by=desc(orders.c.total)
    ).label('rank_in_user')
).order_by(orders.c.user_id, 'rank_in_user')

# Running totals
stmt = select(
    sales.c.date,
    sales.c.amount,
    func.sum(sales.c.amount).over(
        order_by=sales.c.date,
        rows=(None, 0)  # UNBOUNDED PRECEDING to CURRENT ROW
    ).label('running_total')
)

Set Operations

UNION and UNION ALL

from sqlalchemy import union, union_all

# UNION (removes duplicates)
active = select(users).where(users.c.status == 'active')
premium = select(users).where(users.c.premium == True)

stmt = union(active, premium)
# SELECT users.* FROM users WHERE users.status = 'active'
# UNION
# SELECT users.* FROM users WHERE users.premium = true

# UNION ALL (keeps duplicates)
stmt = union_all(active, premium)

# Method syntax
stmt = active.union(premium)
stmt = active.union_all(premium)

INTERSECT and EXCEPT

from sqlalchemy import intersect, except_

# INTERSECT
stmt = intersect(
    select(users.c.id).where(users.c.status == 'active'),
    select(orders.c.user_id).where(orders.c.total > 100)
)

# EXCEPT (difference)
stmt = except_(
    select(users.c.id),
    select(banned_users.c.user_id)
)

Advanced Patterns

Lateral Joins (PostgreSQL)

from sqlalchemy import lateral

# Latest order per user
latest_order = (
    select(orders)
    .where(orders.c.user_id == users.c.id)
    .order_by(desc(orders.c.created_at))
    .limit(1)
    .lateral()
)

stmt = (
    select(users.c.name, latest_order.c.total)
    .select_from(users)
    .join(latest_order, True)  # LATERAL join
)

Table Sampling

from sqlalchemy import tablesample

# Random sample (PostgreSQL)
stmt = select(users).select_from(
    tablesample(users, 10, seed=12345)  # 10% sample
)

Dynamic Column Selection

def build_select(table, columns=None, filters=None):
    # Select specific columns or all
    if columns:
        cols = [getattr(table.c, col) for col in columns]
        stmt = select(*cols)
    else:
        stmt = select(table)
    
    # Apply filters
    if filters:
        for column, value in filters.items():
            stmt = stmt.where(getattr(table.c, column) == value)
    
    return stmt

# Usage
stmt = build_select(users, columns=['id', 'name'], filters={'status': 'active'})

Executing SELECT Statements

from sqlalchemy import create_engine

engine = create_engine('postgresql://user:pass@localhost/db')

stmt = select(users).where(users.c.status == 'active')

# Execute and fetch results
with engine.connect() as conn:
    result = conn.execute(stmt)
    
    # Fetch all rows
    rows = result.fetchall()
    for row in rows:
        print(row.id, row.name)
    
    # Fetch one row
    result = conn.execute(stmt)
    row = result.fetchone()
    
    # Iterate results
    result = conn.execute(stmt)
    for row in result:
        print(row)

Next Steps

INSERT/UPDATE/DELETE

Learn data manipulation statements

Connections

Execute statements and handle results