Skip to main content
Complete reference for SQLAlchemy’s engine and connection management.

Engine Creation

create_engine()

Create a database engine.
from sqlalchemy import create_engine

# Basic usage
engine = create_engine("postgresql://user:pass@localhost/dbname")

# With options
engine = create_engine(
    "postgresql://user:pass@localhost/dbname",
    echo=True,
    pool_size=10,
    max_overflow=20
)
url
str | URL
required
Database URL string or URL object
echo
bool
default:"False"
Log all SQL statements
echo_pool
bool
default:"False"
Log connection pool events
pool_size
int
default:"5"
Number of connections to maintain in pool
max_overflow
int
default:"10"
Maximum overflow connections beyond pool_size
pool_timeout
float
default:"30.0"
Timeout for getting connection from pool
pool_recycle
int
default:"-1"
Recycle connections after N seconds
pool_pre_ping
bool
default:"False"
Test connections before using
isolation_level
str
Transaction isolation level

URL Format

dialect+driver://username:password@host:port/database
PostgreSQL:
# psycopg (default)
"postgresql://user:pass@localhost/dbname"
"postgresql+psycopg://user:pass@localhost/dbname"

# asyncpg (async)
"postgresql+asyncpg://user:pass@localhost/dbname"

# pg8000
"postgresql+pg8000://user:pass@localhost/dbname"
MySQL:
# mysqlclient (default)
"mysql://user:pass@localhost/dbname"
"mysql+mysqldb://user:pass@localhost/dbname"

# PyMySQL
"mysql+pymysql://user:pass@localhost/dbname"

# asyncmy (async)
"mysql+asyncmy://user:pass@localhost/dbname"
SQLite:
# File database
"sqlite:///path/to/database.db"

# In-memory
"sqlite:///:memory:"

# Async
"sqlite+aiosqlite:///path/to/database.db"
Oracle:
"oracle+oracledb://user:pass@localhost:1521/?service_name=XEPDB1"
SQL Server:
"mssql+pyodbc://user:pass@localhost/dbname?driver=ODBC+Driver+17+for+SQL+Server"

Engine Class

Methods

connect() - Get a connection from the pool
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
begin() - Start a transaction context
with engine.begin() as conn:
    conn.execute(users.insert().values(name="Alice"))
    # Auto-commits on context exit
dispose() - Dispose of connection pool
engine.dispose()
**execution_options(kwargs) - Set execution options
engine = engine.execution_options(isolation_level="SERIALIZABLE")

Properties

dialect - Database dialect instance driver - DBAPI driver name name - Dialect name (“postgresql”, “mysql”, etc.) pool - Connection pool instance url - Database URL

Connection

Getting Connections

# Context manager (recommended)
with engine.connect() as conn:
    result = conn.execute(stmt)

# Manual management
conn = engine.connect()
try:
    result = conn.execute(stmt)
finally:
    conn.close()

Connection Methods

execute(statement, parameters=None) - Execute a statement
result = conn.execute(select(users))
result = conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 1})
scalar(statement) - Execute and return scalar result
count = conn.scalar(select(func.count()).select_from(users))
scalars(statement) - Execute and return scalar results
names = conn.scalars(select(users.c.name)).all()
commit() - Commit current transaction
conn.execute(users.insert().values(name="Alice"))
conn.commit()
rollback() - Rollback current transaction
conn.execute(users.insert().values(name="Alice"))
conn.rollback()
begin() - Begin a new transaction
with conn.begin():
    conn.execute(users.insert().values(name="Alice"))
    # Auto-commits on success
begin_nested() - Begin a nested transaction (SAVEPOINT)
with conn.begin():
    conn.execute(users.insert().values(name="Alice"))
    with conn.begin_nested():  # SAVEPOINT
        conn.execute(users.insert().values(name="Bob"))
        # Can rollback just this savepoint
close() - Close connection and return to pool
conn.close()

Result

Result object returned from execution.

Result Methods

all() - Fetch all rows
rows = result.all()
first() - Fetch first row or None
row = result.first()
one() - Fetch exactly one row, raise if 0 or >1
row = result.one()
one_or_none() - Fetch one row or None, raise if >1
row = result.one_or_none()
scalar() - Return first column of first row
count = result.scalar()
scalars() - Return ScalarResult (first column)
names = result.scalars().all()
mappings() - Return MappingResult (dict-like rows)
for row in result.mappings():
    print(row["name"])
fetchone() - Fetch one row
row = result.fetchone()
fetchmany(size) - Fetch multiple rows
rows = result.fetchmany(10)
fetchall() - Fetch all rows
rows = result.fetchall()

Row Access

result = conn.execute(select(users))
row = result.first()

# Tuple-like access
name = row[0]
name = row.name

# Dict-like access (with mappings)
for row in result.mappings():
    name = row["name"]
    email = row["email"]

Transactions

Transaction Isolation Levels

# Engine-level
engine = create_engine(url, isolation_level="REPEATABLE READ")

# Connection-level
with engine.connect().execution_options(isolation_level="SERIALIZABLE") as conn:
    result = conn.execute(stmt)
Isolation levels:
  • READ UNCOMMITTED
  • READ COMMITTED
  • REPEATABLE READ
  • SERIALIZABLE

Two-Phase Commit

with engine.begin() as conn:
    with conn.begin_twophase() as trans:
        conn.execute(stmt1)
        trans.prepare()
        # Can coordinate with other resources
        trans.commit()

Events

from sqlalchemy import event

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    print("New connection")

@event.listens_for(engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    print(f"Executing: {statement}")