Signature:create_engine(url: str | URL, **kwargs) -> EngineCreates a new Engine instance. The first argument is a database URL, followed by optional configuration parameters.
# Test connectiontry: with engine.connect() as conn: result = conn.execute(text("SELECT 1")) print("Connection successful!")except Exception as e: print(f"Connection failed: {e}")# Get raw DBAPI connectionwith engine.connect() as conn: dbapi_conn = conn.connection.dbapi_connection print(f"Connected to: {dbapi_conn}")
# Close all connectionsengine.dispose()# Recommended for long-running applications# that need to recreate the pool# After dispose, new connections will be createdwith engine.connect() as conn: # Fresh connection from new pool result = conn.execute(text("SELECT 1"))
from sqlalchemy import event@event.listens_for(engine, "connect")def receive_connect(dbapi_connection, connection_record): """Called when a new connection is created.""" print("New connection established")@event.listens_for(engine, "checkout")def receive_checkout(dbapi_connection, connection_record, connection_proxy): """Called when a connection is retrieved from the pool.""" print("Connection checked out")@event.listens_for(engine, "checkin")def receive_checkin(dbapi_connection, connection_record): """Called when a connection is returned to the pool.""" print("Connection checked in")
from sqlalchemy import inspectinspector = inspect(engine)# Get table namestables = inspector.get_table_names()print(f"Tables: {tables}")# Get schema namesschemas = inspector.get_schema_names()print(f"Schemas: {schemas}")# Get columnscolumns = inspector.get_columns('users')for col in columns: print(f"{col['name']}: {col['type']}")
# Get dialectprint(engine.dialect.name) # postgresql, mysql, sqlite, etc.# Check dialect featuresprint(engine.dialect.supports_native_boolean) # True for PostgreSQLprint(engine.dialect.supports_sequences) # True for PostgreSQL, Oracleprint(engine.dialect.max_identifier_length) # 63 for PostgreSQL# Server versionwith engine.connect() as conn: version = conn.dialect.server_version_info print(f"Server version: {version}")
# Primary (write) engineprimary_engine = create_engine( 'postgresql://user:pass@primary-host/db')# Replica (read) enginereplica_engine = create_engine( 'postgresql://user:pass@replica-host/db', execution_options={"read_only": True})# Use primary for writeswith primary_engine.begin() as conn: conn.execute(users.insert().values(name='alice'))# Use replica for readswith replica_engine.connect() as conn: result = conn.execute(select(users))