Complete reference for SQLAlchemy’s connection pooling system.
Pool Types
QueuePool
Default pool with configurable size and overflow.
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=10, # Persistent connections
max_overflow=20, # Additional connections when pool exhausted
pool_timeout=30, # Timeout waiting for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True # Test connections before using
)
Number of persistent connections to maintain
Maximum overflow connections beyond pool_size
Seconds to wait for connection before raising TimeoutError
Recycle connections after N seconds (-1 = never)
Test connection validity before using
NullPool
No pooling - creates new connection for each request.
from sqlalchemy.pool import NullPool
engine = create_engine(
"postgresql://user:pass@localhost/db",
poolclass=NullPool
)
Useful for:
- Development/testing environments
- Serverless functions (AWS Lambda, etc.)
- Applications with infrequent database access
StaticPool
Single connection for all requests.
from sqlalchemy.pool import StaticPool
engine = create_engine(
"sqlite:///:memory:",
poolclass=StaticPool
)
Commonly used with SQLite in-memory databases where you need to maintain a single connection.
SingletonThreadPool
One connection per thread.
from sqlalchemy.pool import SingletonThreadPool
engine = create_engine(
"sqlite:///database.db",
poolclass=SingletonThreadPool
)
AsyncAdaptedQueuePool
Async version of QueuePool for async engines.
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20
)
Pool Configuration
Connection Lifecycle
pool_pre_ping - Test connections before use
engine = create_engine(
url,
pool_pre_ping=True # SELECT 1 before each checkout
)
pool_recycle - Recycle old connections
engine = create_engine(
url,
pool_recycle=3600 # Recycle after 1 hour
)
pool_reset_on_return - Reset connection state on return
from sqlalchemy.pool import PoolResetState
engine = create_engine(
url,
pool_reset_on_return="rollback" # rollback, commit, or None
)
Pool Size
pool_size - Persistent connection count
engine = create_engine(url, pool_size=20)
max_overflow - Temporary overflow connections
engine = create_engine(
url,
pool_size=10,
max_overflow=30 # Max 40 total connections
)
Timeouts
pool_timeout - Wait timeout for connection
engine = create_engine(
url,
pool_timeout=60 # Wait up to 60 seconds
)
Pool Methods
Engine Pool Access
engine = create_engine(url)
# Get pool instance
pool = engine.pool
# Pool statistics
print(pool.size()) # Current pool size
print(pool.checkedin()) # Checked in connections
print(pool.checkedout()) # Checked out connections
print(pool.overflow()) # Overflow connections
# Dispose of pool
engine.dispose()
Pool Events
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
print(f"New connection: {dbapi_conn}")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
print(f"Connection checked out")
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
print(f"Connection returned to pool")
@event.listens_for(engine, "close")
def receive_close(dbapi_conn, connection_record):
print(f"Connection closed")
@event.listens_for(engine, "detach")
def receive_detach(dbapi_conn, connection_record):
print(f"Connection detached from pool")
@event.listens_for(engine, "close_detached")
def receive_close_detached(dbapi_conn):
print(f"Detached connection closed")
Pool Monitoring
Pool Statistics
from sqlalchemy import create_engine, event
engine = create_engine(url, echo_pool=True)
# Monitor pool events
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked in: {pool.checkedin()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
Connection Info
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
# connection_record.info is a dict for storing metadata
connection_record.info["created_at"] = time.time()
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
created_at = connection_record.info.get("created_at")
if created_at:
age = time.time() - created_at
print(f"Connection age: {age} seconds")
Common Patterns
Serverless Functions
# AWS Lambda, Google Cloud Functions, etc.
from sqlalchemy.pool import NullPool
engine = create_engine(
url,
poolclass=NullPool # No persistent connections
)
Connection Pooling Best Practices
Set appropriate pool size
engine = create_engine(
url,
pool_size=10, # Based on expected concurrent users
max_overflow=20 # Handle traffic spikes
)
Enable pre-ping for reliability
engine = create_engine(
url,
pool_pre_ping=True # Detect stale connections
)
Recycle connections regularly
engine = create_engine(
url,
pool_recycle=3600 # Recycle every hour
)
Set appropriate timeouts
engine = create_engine(
url,
pool_timeout=30 # Fail fast if pool exhausted
)
Disposing of Pools
# Application shutdown
engine.dispose()
# Force reconnect
engine.dispose()
engine.connect() # Creates new pool
Troubleshooting
Pool Exhaustion
If you see TimeoutError: QueuePool limit exceeded, you have too many concurrent connections.Solutions:
- Increase
pool_size and max_overflow
- Ensure connections are properly closed
- Use connection pooling at application level
Stale Connections
# Enable pre-ping to detect stale connections
engine = create_engine(
url,
pool_pre_ping=True,
pool_recycle=3600
)
Memory Leaks
Always close connections properly:# Use context managers (recommended)
with engine.connect() as conn:
result = conn.execute(stmt)
# Or explicit close
conn = engine.connect()
try:
result = conn.execute(stmt)
finally:
conn.close()