Skip to main content

Connection Pooling

Connection pooling maintains a cache of database connections, improving performance by reusing connections instead of creating new ones for each request.

Why Connection Pooling?

Performance

Creating database connections is expensive. Pooling reuses existing connections.

Resource Management

Limits the number of concurrent database connections.

Connection Overhead

Eliminates connection setup/teardown time for each query.

Scalability

Handles multiple concurrent requests efficiently.

Pool Types

QueuePool (Default)

QueuePool is the default for all databases except SQLite :memory:.
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Default pool (QueuePool)
engine = create_engine(
    'postgresql://user:pass@localhost/db'
)

# Explicit QueuePool
engine = create_engine(
    'postgresql://user:pass@localhost/db',
    poolclass=QueuePool,
    pool_size=5,          # Keep 5 connections
    max_overflow=10,      # Allow 10 additional connections
    pool_timeout=30,      # Wait 30s for a connection
    pool_recycle=3600,    # Recycle connections after 1 hour
    pool_pre_ping=True    # Test connections before use
)
QueuePool
class
Parameters:
  • pool_size: Number of connections to maintain (default: 5)
  • max_overflow: Additional connections allowed (default: 10)
  • timeout: Seconds to wait for a connection (default: 30)
  • recycle: Seconds before recycling connections (default: -1, disabled)
  • pre_ping: Test connection before checkout (default: False)
  • use_lifo: Use LIFO instead of FIFO (default: False)
How QueuePool Works:
1

Initial State

Pool starts empty. Connections created on demand up to pool_size.
2

Connection Request

When a connection is requested:
  • If available in pool, return it
  • If pool size < pool_size, create new connection
  • If at pool_size but under max_overflow, create overflow connection
  • If at limit, wait up to timeout seconds
3

Connection Return

When connection is returned:
  • If within pool_size, keep in pool
  • If overflow connection, close and discard

NullPool

No connection pooling - creates new connection for each request.
from sqlalchemy.pool import NullPool

# No pooling
engine = create_engine(
    'postgresql://user:pass@localhost/db',
    poolclass=NullPool
)

# Useful for:
# - Testing
# - Serverless environments (AWS Lambda)
# - Applications with infrequent database access

StaticPool

Single connection shared by all threads. Used for SQLite :memory:.
from sqlalchemy.pool import StaticPool

# SQLite in-memory with StaticPool
engine = create_engine(
    'sqlite:///:memory:',
    poolclass=StaticPool,
    connect_args={'check_same_thread': False}
)

# All requests use the same connection
# Required for :memory: databases to persist data

SingletonThreadPool

Maintains one connection per thread.
from sqlalchemy.pool import SingletonThreadPool

# One connection per thread
engine = create_engine(
    'sqlite:///db.db',
    poolclass=SingletonThreadPool
)

# Default for SQLite file databases
# Each thread gets its own connection

AssertionPool

Ensures at most one connection checked out at a time. Used for testing.
from sqlalchemy.pool import AssertionPool

# Only one connection at a time
engine = create_engine(
    'postgresql://user:pass@localhost/db',
    poolclass=AssertionPool
)

# Raises AssertionError if multiple checkouts attempted
# Useful for debugging connection leaks

Pool Configuration

Basic Configuration

engine = create_engine(
    'postgresql://user:pass@localhost/db',
    
    # Pool size configuration
    pool_size=10,           # Maintain 10 connections
    max_overflow=20,        # Allow 20 additional connections
    
    # Connection lifetime
    pool_recycle=3600,      # Recycle after 1 hour
    pool_pre_ping=True,     # Test connections before use
    
    # Checkout behavior
    pool_timeout=30,        # Wait 30s for connection
    pool_use_lifo=True,     # Use most recently returned connection
)

pool_size and max_overflow

# Conservative (small application)
engine = create_engine(
    url,
    pool_size=5,
    max_overflow=5
)
# Total max connections: 10

# Moderate (medium application)
engine = create_engine(
    url,
    pool_size=10,
    max_overflow=20
)
# Total max connections: 30

# Aggressive (large application)
engine = create_engine(
    url,
    pool_size=20,
    max_overflow=40
)
# Total max connections: 60

# Unlimited overflow (use with caution!)
engine = create_engine(
    url,
    pool_size=10,
    max_overflow=-1  # No limit on overflow
)
Set pool_size based on your database’s connection limit and application concurrency. PostgreSQL default is 100 connections.

pool_recycle

# Recycle connections every 30 minutes
engine = create_engine(
    url,
    pool_recycle=1800
)

# Recommended for:
# - Databases that close idle connections
# - Load-balanced database setups
# - Long-running applications

# MySQL example (closes connections after 8 hours by default)
engine = create_engine(
    'mysql://user:pass@localhost/db',
    pool_recycle=3600  # Recycle every hour (well before 8 hours)
)

pool_pre_ping

# Test connections before checkout
engine = create_engine(
    url,
    pool_pre_ping=True
)

# Prevents "MySQL server has gone away" errors
# Executes lightweight query (SELECT 1) before each use
# Small performance cost, but prevents stale connections
When to use pool_pre_ping:
  • Applications with long idle periods
  • Databases that close idle connections
  • Load-balanced database setups
  • Production environments (recommended)

pool_use_lifo

# Use LIFO (Last In, First Out)
engine = create_engine(
    url,
    pool_use_lifo=True
)

# Default is FIFO (First In, First Out)
# LIFO can reduce connection count during low traffic
# Allows older connections to be recycled

Pool Events

Connection Lifecycle Events

from sqlalchemy import event, create_engine

engine = create_engine('postgresql://user:pass@localhost/db')

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    """Called when new connection is created."""
    print(f"New connection created: {id(dbapi_conn)}")

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    """Called when connection is retrieved from pool."""
    print(f"Connection checked out: {id(dbapi_conn)}")

@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    """Called when connection is returned to pool."""
    print(f"Connection returned: {id(dbapi_conn)}")

@event.listens_for(engine, "close")
def receive_close(dbapi_conn, connection_record):
    """Called when connection is closed."""
    print(f"Connection closed: {id(dbapi_conn)}")

Pool Reset Events

from sqlalchemy.pool import reset_rollback, reset_commit, reset_none

# Rollback on return (default)
engine = create_engine(
    url,
    pool_reset_on_return='rollback'  # or reset_rollback
)

# Commit on return
engine = create_engine(
    url,
    pool_reset_on_return='commit'  # or reset_commit
)

# No reset
engine = create_engine(
    url,
    pool_reset_on_return=None  # or reset_none
)

# Custom reset
@event.listens_for(engine, "reset")
def receive_reset(dbapi_conn, connection_record, reset_state):
    """Custom connection reset logic."""
    if reset_state.is_soft:
        # Soft reset (connection returned to pool)
        dbapi_conn.rollback()
    else:
        # Hard reset (connection being closed)
        pass

Pool Monitoring

Pool Status

# Get pool status
pool = engine.pool

print(f"Pool size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
print(f"Checked in: {pool.checkedin()}")

# Pool statistics
from sqlalchemy import event

pool_stats = {
    'checkouts': 0,
    'checkins': 0,
    'connects': 0,
    'disconnects': 0
}

@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
    pool_stats['checkouts'] += 1

@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
    pool_stats['checkins'] += 1

@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, connection_record):
    pool_stats['connects'] += 1

@event.listens_for(engine, "close")
def on_close(dbapi_conn, connection_record):
    pool_stats['disconnects'] += 1

# Check stats
print(pool_stats)

Pool Exhaustion Detection

from sqlalchemy.exc import TimeoutError
import time

def get_connection_with_retry(engine, max_retries=3):
    """Get connection with retry logic."""
    for attempt in range(max_retries):
        try:
            return engine.connect()
        except TimeoutError:
            if attempt == max_retries - 1:
                raise
            print(f"Pool exhausted, retrying... ({attempt + 1}/{max_retries})")
            time.sleep(1)

# Usage
try:
    conn = get_connection_with_retry(engine)
    # Use connection
except TimeoutError:
    print("Could not get connection after retries")

Pool Management

Disposing Pool

# Close all connections and reset pool
engine.dispose()

# Useful when:
# - Database connection parameters change
# - Recovering from connection errors
# - Application restart/reload

# After dispose, new connections will be created
with engine.connect() as conn:
    # Fresh connection from new pool
    result = conn.execute(text("SELECT 1"))

Pool Recreation

def recreate_pool(engine):
    """Recreate connection pool."""
    engine.dispose()
    # Test new pool
    with engine.connect() as conn:
        conn.execute(text("SELECT 1"))
    print("Pool recreated successfully")

# Use after database failover or configuration change
recreate_pool(engine)

Common Patterns

Web Application Pool

# Flask/FastAPI application
engine = create_engine(
    os.getenv('DATABASE_URL'),
    pool_size=10,           # Base connections
    max_overflow=20,        # Handle traffic spikes
    pool_recycle=3600,      # Recycle hourly
    pool_pre_ping=True,     # Prevent stale connections
    pool_use_lifo=True,     # Reduce idle connections
)

Celery Worker Pool

# Celery worker configuration
engine = create_engine(
    DATABASE_URL,
    pool_size=2,            # Low base (workers are isolated)
    max_overflow=5,         # Small overflow
    pool_recycle=1800,      # Recycle every 30 min
    pool_pre_ping=True
)

AWS Lambda (Serverless)

from sqlalchemy.pool import NullPool

# No pooling for Lambda
engine = create_engine(
    DATABASE_URL,
    poolclass=NullPool  # Create connection per invocation
)

# Or use RDS Proxy for connection pooling

Testing Pool

# Test configuration
engine = create_engine(
    'postgresql://user:pass@localhost/test_db',
    poolclass=StaticPool,      # Single connection
    connect_args={'check_same_thread': False}
)

# Or assertion pool
engine = create_engine(
    'postgresql://user:pass@localhost/test_db',
    poolclass=AssertionPool  # Detect connection leaks
)

Troubleshooting

Connection Leaks

# Detect connection leaks
from sqlalchemy.pool import AssertionPool

engine = create_engine(
    url,
    poolclass=AssertionPool
)

# This will raise AssertionError if connection not returned
conn1 = engine.connect()
conn2 = engine.connect()  # AssertionError!

# Fix: Always use context managers
with engine.connect() as conn:
    # Connection automatically returned
    pass

Pool Exhaustion

# Monitor pool exhaustion
import logging
from sqlalchemy import event

logging.basicConfig()
logger = logging.getLogger('pool_monitor')
logger.setLevel(logging.WARNING)

@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
    pool = engine.pool
    checked_out = pool.checkedout()
    
    # Warn if pool is heavily used
    if checked_out > pool.size() * 0.8:
        logger.warning(
            f"Pool at {checked_out}/{pool.size() + pool.overflow()} capacity"
        )

Stale Connections

# Prevent stale connections
engine = create_engine(
    url,
    pool_pre_ping=True,      # Test before use
    pool_recycle=3600,       # Recycle hourly
)

# Or use custom ping
@event.listens_for(engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        return
    
    try:
        connection.scalar(text("SELECT 1"))
    except Exception:
        # Connection is stale, invalidate
        connection.invalidate()
        raise

Performance Tips

1

Right-size pool_size

Set based on concurrent requests and database limits. Start conservative (5-10).
2

Enable pool_pre_ping in production

Small overhead but prevents stale connection errors.
3

Set pool_recycle

Recycle connections before database times them out. Use 50-75% of database timeout.
4

Monitor pool usage

Use events to track checkout/checkin patterns and adjust pool_size.
5

Use LIFO for variable traffic

pool_use_lifo=True allows idle connections to be recycled.
6

Always use context managers

Ensures connections are returned to pool promptly.

Next Steps

Engines

Learn about engine configuration

Connections

Master connection usage patterns