Connection pooling maintains a cache of database connections, improving performance by reusing connections instead of creating new ones for each request.
Single connection shared by all threads. Used for SQLite :memory:.
from sqlalchemy.pool import StaticPool# SQLite in-memory with StaticPoolengine = create_engine( 'sqlite:///:memory:', poolclass=StaticPool, connect_args={'check_same_thread': False})# All requests use the same connection# Required for :memory: databases to persist data
from sqlalchemy.pool import SingletonThreadPool# One connection per threadengine = create_engine( 'sqlite:///db.db', poolclass=SingletonThreadPool)# Default for SQLite file databases# Each thread gets its own connection
Ensures at most one connection checked out at a time. Used for testing.
from sqlalchemy.pool import AssertionPool# Only one connection at a timeengine = create_engine( 'postgresql://user:pass@localhost/db', poolclass=AssertionPool)# Raises AssertionError if multiple checkouts attempted# Useful for debugging connection leaks
# Test connections before checkoutengine = create_engine( url, pool_pre_ping=True)# Prevents "MySQL server has gone away" errors# Executes lightweight query (SELECT 1) before each use# Small performance cost, but prevents stale connections
# Use LIFO (Last In, First Out)engine = create_engine( url, pool_use_lifo=True)# Default is FIFO (First In, First Out)# LIFO can reduce connection count during low traffic# Allows older connections to be recycled
from sqlalchemy import event, create_engineengine = create_engine('postgresql://user:pass@localhost/db')@event.listens_for(engine, "connect")def receive_connect(dbapi_conn, connection_record): """Called when new connection is created.""" print(f"New connection created: {id(dbapi_conn)}")@event.listens_for(engine, "checkout")def receive_checkout(dbapi_conn, connection_record, connection_proxy): """Called when connection is retrieved from pool.""" print(f"Connection checked out: {id(dbapi_conn)}")@event.listens_for(engine, "checkin")def receive_checkin(dbapi_conn, connection_record): """Called when connection is returned to pool.""" print(f"Connection returned: {id(dbapi_conn)}")@event.listens_for(engine, "close")def receive_close(dbapi_conn, connection_record): """Called when connection is closed.""" print(f"Connection closed: {id(dbapi_conn)}")
# Close all connections and reset poolengine.dispose()# Useful when:# - Database connection parameters change# - Recovering from connection errors# - Application restart/reload# After dispose, new connections will be createdwith engine.connect() as conn: # Fresh connection from new pool result = conn.execute(text("SELECT 1"))
def recreate_pool(engine): """Recreate connection pool.""" engine.dispose() # Test new pool with engine.connect() as conn: conn.execute(text("SELECT 1")) print("Pool recreated successfully")# Use after database failover or configuration changerecreate_pool(engine)
# Celery worker configurationengine = create_engine( DATABASE_URL, pool_size=2, # Low base (workers are isolated) max_overflow=5, # Small overflow pool_recycle=1800, # Recycle every 30 min pool_pre_ping=True)
from sqlalchemy.pool import NullPool# No pooling for Lambdaengine = create_engine( DATABASE_URL, poolclass=NullPool # Create connection per invocation)# Or use RDS Proxy for connection pooling
# Prevent stale connectionsengine = create_engine( url, pool_pre_ping=True, # Test before use pool_recycle=3600, # Recycle hourly)# Or use custom ping@event.listens_for(engine, "engine_connect")def ping_connection(connection, branch): if branch: return try: connection.scalar(text("SELECT 1")) except Exception: # Connection is stale, invalidate connection.invalidate() raise