Skip to main content
Complete reference for SQLAlchemy’s connection pooling system.

Pool Types

QueuePool

Default pool with configurable size and overflow.
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=10,        # Persistent connections
    max_overflow=20,     # Additional connections when pool exhausted
    pool_timeout=30,     # Timeout waiting for connection
    pool_recycle=3600,   # Recycle connections after 1 hour
    pool_pre_ping=True   # Test connections before using
)
pool_size
int
default:"5"
Number of persistent connections to maintain
max_overflow
int
default:"10"
Maximum overflow connections beyond pool_size
pool_timeout
float
default:"30.0"
Seconds to wait for connection before raising TimeoutError
pool_recycle
int
default:"-1"
Recycle connections after N seconds (-1 = never)
pool_pre_ping
bool
default:"False"
Test connection validity before using

NullPool

No pooling - creates new connection for each request.
from sqlalchemy.pool import NullPool

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    poolclass=NullPool
)
Useful for:
  • Development/testing environments
  • Serverless functions (AWS Lambda, etc.)
  • Applications with infrequent database access

StaticPool

Single connection for all requests.
from sqlalchemy.pool import StaticPool

engine = create_engine(
    "sqlite:///:memory:",
    poolclass=StaticPool
)
Commonly used with SQLite in-memory databases where you need to maintain a single connection.

SingletonThreadPool

One connection per thread.
from sqlalchemy.pool import SingletonThreadPool

engine = create_engine(
    "sqlite:///database.db",
    poolclass=SingletonThreadPool
)

AsyncAdaptedQueuePool

Async version of QueuePool for async engines.
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=10,
    max_overflow=20
)

Pool Configuration

Connection Lifecycle

pool_pre_ping - Test connections before use
engine = create_engine(
    url,
    pool_pre_ping=True  # SELECT 1 before each checkout
)
pool_recycle - Recycle old connections
engine = create_engine(
    url,
    pool_recycle=3600  # Recycle after 1 hour
)
pool_reset_on_return - Reset connection state on return
from sqlalchemy.pool import PoolResetState

engine = create_engine(
    url,
    pool_reset_on_return="rollback"  # rollback, commit, or None
)

Pool Size

pool_size - Persistent connection count
engine = create_engine(url, pool_size=20)
max_overflow - Temporary overflow connections
engine = create_engine(
    url,
    pool_size=10,
    max_overflow=30  # Max 40 total connections
)

Timeouts

pool_timeout - Wait timeout for connection
engine = create_engine(
    url,
    pool_timeout=60  # Wait up to 60 seconds
)

Pool Methods

Engine Pool Access

engine = create_engine(url)

# Get pool instance
pool = engine.pool

# Pool statistics
print(pool.size())        # Current pool size
print(pool.checkedin())   # Checked in connections
print(pool.checkedout())  # Checked out connections
print(pool.overflow())    # Overflow connections

# Dispose of pool
engine.dispose()

Pool Events

from sqlalchemy import event

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    print(f"New connection: {dbapi_conn}")

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    print(f"Connection checked out")

@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    print(f"Connection returned to pool")

@event.listens_for(engine, "close")
def receive_close(dbapi_conn, connection_record):
    print(f"Connection closed")

@event.listens_for(engine, "detach")
def receive_detach(dbapi_conn, connection_record):
    print(f"Connection detached from pool")

@event.listens_for(engine, "close_detached")
def receive_close_detached(dbapi_conn):
    print(f"Detached connection closed")

Pool Monitoring

Pool Statistics

from sqlalchemy import create_engine, event

engine = create_engine(url, echo_pool=True)

# Monitor pool events
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    pool = engine.pool
    print(f"Pool size: {pool.size()}")
    print(f"Checked in: {pool.checkedin()}")
    print(f"Checked out: {pool.checkedout()}")
    print(f"Overflow: {pool.overflow()}")

Connection Info

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    # connection_record.info is a dict for storing metadata
    connection_record.info["created_at"] = time.time()

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    created_at = connection_record.info.get("created_at")
    if created_at:
        age = time.time() - created_at
        print(f"Connection age: {age} seconds")

Common Patterns

Serverless Functions

# AWS Lambda, Google Cloud Functions, etc.
from sqlalchemy.pool import NullPool

engine = create_engine(
    url,
    poolclass=NullPool  # No persistent connections
)

Connection Pooling Best Practices

1

Set appropriate pool size

engine = create_engine(
    url,
    pool_size=10,      # Based on expected concurrent users
    max_overflow=20    # Handle traffic spikes
)
2

Enable pre-ping for reliability

engine = create_engine(
    url,
    pool_pre_ping=True  # Detect stale connections
)
3

Recycle connections regularly

engine = create_engine(
    url,
    pool_recycle=3600  # Recycle every hour
)
4

Set appropriate timeouts

engine = create_engine(
    url,
    pool_timeout=30  # Fail fast if pool exhausted
)

Disposing of Pools

# Application shutdown
engine.dispose()

# Force reconnect
engine.dispose()
engine.connect()  # Creates new pool

Troubleshooting

Pool Exhaustion

If you see TimeoutError: QueuePool limit exceeded, you have too many concurrent connections.Solutions:
  • Increase pool_size and max_overflow
  • Ensure connections are properly closed
  • Use connection pooling at application level

Stale Connections

# Enable pre-ping to detect stale connections
engine = create_engine(
    url,
    pool_pre_ping=True,
    pool_recycle=3600
)

Memory Leaks

Always close connections properly:
# Use context managers (recommended)
with engine.connect() as conn:
    result = conn.execute(stmt)

# Or explicit close
conn = engine.connect()
try:
    result = conn.execute(stmt)
finally:
    conn.close()