Complete reference for SQLAlchemy’s engine and connection management.
Engine Creation
create_engine()
Create a database engine.
from sqlalchemy import create_engine
# Basic usage
engine = create_engine("postgresql://user:pass@localhost/dbname")
# With options
engine = create_engine(
"postgresql://user:pass@localhost/dbname",
echo=True,
pool_size=10,
max_overflow=20
)
Database URL string or URL object
Log connection pool events
Number of connections to maintain in pool
Maximum overflow connections beyond pool_size
Timeout for getting connection from pool
Recycle connections after N seconds
Test connections before using
Transaction isolation level
dialect+driver://username:password@host:port/database
PostgreSQL:
# psycopg (default)
"postgresql://user:pass@localhost/dbname"
"postgresql+psycopg://user:pass@localhost/dbname"
# asyncpg (async)
"postgresql+asyncpg://user:pass@localhost/dbname"
# pg8000
"postgresql+pg8000://user:pass@localhost/dbname"
MySQL:
# mysqlclient (default)
"mysql://user:pass@localhost/dbname"
"mysql+mysqldb://user:pass@localhost/dbname"
# PyMySQL
"mysql+pymysql://user:pass@localhost/dbname"
# asyncmy (async)
"mysql+asyncmy://user:pass@localhost/dbname"
SQLite:
# File database
"sqlite:///path/to/database.db"
# In-memory
"sqlite:///:memory:"
# Async
"sqlite+aiosqlite:///path/to/database.db"
Oracle:
"oracle+oracledb://user:pass@localhost:1521/?service_name=XEPDB1"
SQL Server:
"mssql+pyodbc://user:pass@localhost/dbname?driver=ODBC+Driver+17+for+SQL+Server"
Engine Class
Methods
connect() - Get a connection from the pool
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
begin() - Start a transaction context
with engine.begin() as conn:
conn.execute(users.insert().values(name="Alice"))
# Auto-commits on context exit
dispose() - Dispose of connection pool
**execution_options(kwargs) - Set execution options
engine = engine.execution_options(isolation_level="SERIALIZABLE")
Properties
dialect - Database dialect instance
driver - DBAPI driver name
name - Dialect name (“postgresql”, “mysql”, etc.)
pool - Connection pool instance
url - Database URL
Connection
Getting Connections
# Context manager (recommended)
with engine.connect() as conn:
result = conn.execute(stmt)
# Manual management
conn = engine.connect()
try:
result = conn.execute(stmt)
finally:
conn.close()
Connection Methods
execute(statement, parameters=None) - Execute a statement
result = conn.execute(select(users))
result = conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 1})
scalar(statement) - Execute and return scalar result
count = conn.scalar(select(func.count()).select_from(users))
scalars(statement) - Execute and return scalar results
names = conn.scalars(select(users.c.name)).all()
commit() - Commit current transaction
conn.execute(users.insert().values(name="Alice"))
conn.commit()
rollback() - Rollback current transaction
conn.execute(users.insert().values(name="Alice"))
conn.rollback()
begin() - Begin a new transaction
with conn.begin():
conn.execute(users.insert().values(name="Alice"))
# Auto-commits on success
begin_nested() - Begin a nested transaction (SAVEPOINT)
with conn.begin():
conn.execute(users.insert().values(name="Alice"))
with conn.begin_nested(): # SAVEPOINT
conn.execute(users.insert().values(name="Bob"))
# Can rollback just this savepoint
close() - Close connection and return to pool
Result
Result object returned from execution.
Result Methods
all() - Fetch all rows
first() - Fetch first row or None
one() - Fetch exactly one row, raise if 0 or >1
one_or_none() - Fetch one row or None, raise if >1
row = result.one_or_none()
scalar() - Return first column of first row
scalars() - Return ScalarResult (first column)
names = result.scalars().all()
mappings() - Return MappingResult (dict-like rows)
for row in result.mappings():
print(row["name"])
fetchone() - Fetch one row
fetchmany(size) - Fetch multiple rows
rows = result.fetchmany(10)
fetchall() - Fetch all rows
Row Access
result = conn.execute(select(users))
row = result.first()
# Tuple-like access
name = row[0]
name = row.name
# Dict-like access (with mappings)
for row in result.mappings():
name = row["name"]
email = row["email"]
Transactions
Transaction Isolation Levels
# Engine-level
engine = create_engine(url, isolation_level="REPEATABLE READ")
# Connection-level
with engine.connect().execution_options(isolation_level="SERIALIZABLE") as conn:
result = conn.execute(stmt)
Isolation levels:
READ UNCOMMITTED
READ COMMITTED
REPEATABLE READ
SERIALIZABLE
Two-Phase Commit
with engine.begin() as conn:
with conn.begin_twophase() as trans:
conn.execute(stmt1)
trans.prepare()
# Can coordinate with other resources
trans.commit()
Events
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
print("New connection")
@event.listens_for(engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
print(f"Executing: {statement}")