Overview
While SQLAlchemy includes many dialects, you may need to create a custom dialect for:
Proprietary database systems
Database variants or forks
Specialized data storage systems with SQL interfaces
Cloud-based database services
Embedded databases
Custom dialects should be published as external projects rather than being included in SQLAlchemy core.
Dialect Structure
A typical dialect project has the following structure:
sqlalchemy-mydialect/
├── setup.py
├── setup.cfg
├── sqlalchemy_mydialect/
│ ├── __init__.py
│ ├── base.py
│ ├── mydriver.py
│ └── requirements.py
└── test/
├── __init__.py
├── conftest.py
├── test_suite.py
└── test_mydialect_specific.py
Setting Up Entry Points
Register your dialect using setuptools entry points:
from setuptools import setup, find_packages
setup(
name = 'sqlalchemy-mydialect' ,
version = '1.0.0' ,
description = 'SQLAlchemy dialect for MyDatabase' ,
packages = find_packages(),
install_requires = [
'sqlalchemy>=2.0' ,
'mydb-driver>=1.0'
],
entry_points = {
'sqlalchemy.dialects' : [
'mydialect = sqlalchemy_mydialect.base:MyDialect' ,
'mydialect.mydriver = sqlalchemy_mydialect.mydriver:MyDialect_mydriver' ,
]
},
classifiers = [
'Development Status :: 4 - Beta' ,
'Intended Audience :: Developers' ,
'Programming Language :: Python :: 3' ,
'Programming Language :: Python :: 3.8' ,
'Programming Language :: Python :: 3.9' ,
'Programming Language :: Python :: 3.10' ,
'Programming Language :: Python :: 3.11' ,
]
)
Runtime Registration
Alternatively, register dialects at runtime:
from sqlalchemy.dialects import registry
import pytest
# Register dialect without entry points
registry.register(
"mydialect.mydriver" ,
"sqlalchemy_mydialect.mydriver" ,
"MyDialect_mydriver"
)
pytest.register_assert_rewrite( "sqlalchemy.testing.assertions" )
from sqlalchemy.testing.plugin.pytestplugin import *
Creating the Base Dialect
from sqlalchemy.engine import default
from sqlalchemy.sql import compiler
from sqlalchemy import types as sqltypes
class MyDialect ( default . DefaultDialect ):
name = 'mydialect'
driver = 'mydriver'
# Capabilities
supports_alter = True
supports_native_boolean = True
supports_sequences = True
supports_default_values = True
supports_empty_insert = True
# Connection settings
default_paramstyle = 'qmark'
# Type mappings
colspecs = {
sqltypes.Boolean: MyBoolean,
sqltypes.DateTime: MyDateTime,
}
# Reflection mappings
ischema_names = {
'INTEGER' : sqltypes. INTEGER ,
'VARCHAR' : sqltypes. VARCHAR ,
'BOOLEAN' : sqltypes. BOOLEAN ,
'TIMESTAMP' : sqltypes. TIMESTAMP ,
}
def __init__ ( self , ** kwargs ):
super (). __init__ ( ** kwargs)
@ classmethod
def dbapi ( cls ):
"""Import and return the DBAPI module."""
import mydb_driver
return mydb_driver
def create_connect_args ( self , url ):
"""Build connection arguments from URL."""
opts = url.translate_connect_args( username = 'user' )
opts.update(url.query)
return ([], opts)
def do_rollback ( self , dbapi_connection ):
"""Perform rollback on connection."""
dbapi_connection.rollback()
def do_commit ( self , dbapi_connection ):
"""Perform commit on connection."""
dbapi_connection.commit()
from sqlalchemy import types as sqltypes
class MyBoolean ( sqltypes . Boolean ):
"""Custom boolean type for MyDatabase."""
def bind_processor ( self , dialect ):
def process ( value ):
if value is None :
return None
return 1 if value else 0
return process
def result_processor ( self , dialect , coltype ):
def process ( value ):
if value is None :
return None
return bool (value)
return process
class MyDateTime ( sqltypes . DateTime ):
"""Custom datetime type."""
def bind_processor ( self , dialect ):
def process ( value ):
if value is None :
return None
# Convert to database format
return value.strftime( '%Y-%m- %d %H:%M:%S' )
return process
def result_processor ( self , dialect , coltype ):
def process ( value ):
if value is None :
return None
# Parse from database format
from datetime import datetime
return datetime.strptime(value, '%Y-%m- %d %H:%M:%S' )
return process
Type Compiler
Implement DDL generation for types:
from sqlalchemy.sql import compiler
class MyTypeCompiler ( compiler . GenericTypeCompiler ):
"""Compile types to DDL."""
def visit_BOOLEAN ( self , type_ , ** kw ):
return "BOOL"
def visit_VARCHAR ( self , type_ , ** kw ):
if type_.length:
return f "VARCHAR( { type_.length } )"
return "VARCHAR"
def visit_TEXT ( self , type_ , ** kw ):
return "LONGTEXT"
def visit_DATETIME ( self , type_ , ** kw ):
return "TIMESTAMP"
# Add to dialect
class MyDialect ( default . DefaultDialect ):
# ...
type_compiler = MyTypeCompiler
SQL Compiler
Customize SQL generation:
class MyCompiler ( compiler . SQLCompiler ):
"""Compile SQL statements."""
def visit_select ( self , select , ** kwargs ):
"""Custom SELECT compilation."""
# Add custom logic
return super ().visit_select(select, ** kwargs)
def limit_clause ( self , select , ** kw ):
"""Custom LIMIT clause."""
text = ""
if select._limit_clause is not None :
text += f " LIMIT { self .process(select._limit_clause, ** kw) } "
if select._offset_clause is not None :
text += f " OFFSET { self .process(select._offset_clause, ** kw) } "
return text
class MyDDLCompiler ( compiler . DDLCompiler ):
"""Compile DDL statements."""
def visit_create_table ( self , create ):
"""Custom CREATE TABLE."""
table = create.element
preparer = self .preparer
text = f " \n CREATE TABLE { preparer.format_table(table) } ("
# Add columns
column_specs = []
for column in table.columns:
column_specs.append(
f " { preparer.format_column(column) } { self .get_column_specification(column) } "
)
# Add constraints
for constraint in table.constraints:
if constraint.name is not None :
column_specs.append( self .process(constraint))
text += ", \n " .join(column_specs)
text += " \n )"
return text
# Add to dialect
class MyDialect ( default . DefaultDialect ):
# ...
statement_compiler = MyCompiler
ddl_compiler = MyDDLCompiler
Database Reflection
Implement introspection methods:
class MyDialect ( default . DefaultDialect ):
# ...
@reflection.cache
def get_table_names ( self , connection , schema = None , ** kw ):
"""Get list of table names."""
query = "SELECT table_name FROM information_schema.tables"
if schema:
query += f " WHERE table_schema = ' { schema } '"
result = connection.execute(text(query))
return [row[ 0 ] for row in result]
@reflection.cache
def get_columns ( self , connection , table_name , schema = None , ** kw ):
"""Get column information for a table."""
query = f """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = ' { table_name } '
"""
if schema:
query += f " AND table_schema = ' { schema } '"
result = connection.execute(text(query))
columns = []
for row in result:
col_dict = {
'name' : row[ 0 ],
'type' : self .ischema_names.get(row[ 1 ], sqltypes. NULLTYPE ),
'nullable' : row[ 2 ] == 'YES' ,
'default' : row[ 3 ]
}
columns.append(col_dict)
return columns
@reflection.cache
def get_pk_constraint ( self , connection , table_name , schema = None , ** kw ):
"""Get primary key constraint."""
# Query database for PK information
# Return {'constrained_columns': [...], 'name': '...'}
pass
@reflection.cache
def get_foreign_keys ( self , connection , table_name , schema = None , ** kw ):
"""Get foreign key constraints."""
# Query database for FK information
# Return list of FK dicts
pass
@reflection.cache
def get_indexes ( self , connection , table_name , schema = None , ** kw ):
"""Get indexes for a table."""
# Query database for index information
# Return list of index dicts
pass
DBAPI Adapter
For drivers that don’t follow PEP 249:
class MyDialect_mydriver ( MyDialect ):
driver = 'mydriver'
@ classmethod
def dbapi ( cls ):
import mydriver
return mydriver
def create_connect_args ( self , url ):
"""Adapt URL to driver-specific format."""
opts = {
'host' : url.host or 'localhost' ,
'port' : url.port or 5432 ,
'database' : url.database,
'user' : url.username,
'password' : url.password,
}
# Add query parameters
opts.update(url.query)
return ([], opts)
def do_execute ( self , cursor , statement , parameters , context = None ):
"""Execute a statement."""
# Adapt to driver's execute method
cursor.execute(statement, parameters)
def do_executemany ( self , cursor , statement , parameters , context = None ):
"""Execute a statement multiple times."""
cursor.executemany(statement, parameters)
AsyncIO Support
For async drivers:
from sqlalchemy.engine import AdaptedConnection
from sqlalchemy.dialects.postgresql.asyncpg import AsyncAdapt_asyncpg_connection
class AsyncAdapt_mydriver_connection ( AdaptedConnection ):
"""Adapt async driver to PEP 249 interface."""
await_only = True
def __init__ ( self , dbapi_connection ):
self ._connection = dbapi_connection
@ property
def autocommit ( self ):
return self ._connection.autocommit
@autocommit.setter
def autocommit ( self , value ):
self ._connection.autocommit = value
def cursor ( self ):
return AsyncAdapt_mydriver_cursor( self ._connection)
class MyDialect_mydriver_async ( MyDialect_mydriver ):
is_async = True
supports_statement_cache = True
@ classmethod
def dbapi ( cls ):
return __import__ ( 'mydriver' )
@ classmethod
def get_pool_class ( cls , url ):
from sqlalchemy.pool import AsyncAdaptedQueuePool
return AsyncAdaptedQueuePool
def get_driver_connection ( self , connection ):
return connection._connection
Testing Your Dialect
Use SQLAlchemy’s test suite:
from sqlalchemy.testing.suite import *
# Include all standard tests
from sqlalchemy.testing.suite import (
ComponentReflectionTest as _ComponentReflectionTest
)
# Override specific tests if needed
class ComponentReflectionTest ( _ComponentReflectionTest ):
@testing.skip ( "mydialect" )
def test_not_supported_feature ( self ):
pass
Requirements File
from sqlalchemy.testing.requirements import SuiteRequirements
from sqlalchemy.testing import exclusions
class Requirements ( SuiteRequirements ):
@ property
def nullable_booleans ( self ):
"""Database supports nullable boolean columns."""
return exclusions.open()
@ property
def returning ( self ):
"""Database supports RETURNING clause."""
return exclusions.closed()
@ property
def sequences ( self ):
"""Database supports sequences."""
return exclusions.open()
@ property
def window_functions ( self ):
"""Database supports window functions."""
return exclusions.open()
Test Configuration
[tool:pytest]
addopts = --tb native -v -r fxX -- maxfail =25 -p no:warnings
python_files = test/test_*.py
[sqla_testing]
requirement_cls = sqlalchemy_mydialect.requirements:Requirements
profile_file = test/profiles.txt
[db]
default = mydialect+mydriver://user:pass@localhost/testdb
sqlite = sqlite:///:memory:
Best Practices
Use Type Migration Guidelines
Follow SQLAlchemy’s type migration guidelines (see Type Migration ) for naming and implementing types.
Properly implement all reflection methods (get_table_names, get_columns, etc.) for full introspection support.
Run the dialect compliance suite to ensure compatibility with SQLAlchemy’s expectations.
Document Connection Strings
Provide clear documentation on connection string format and supported parameters.
Clearly document which database versions are supported.
Publishing Your Dialect
Package structure : Follow the standard structure shown above
Documentation : Include comprehensive README and docs
Testing : Include test suite and CI configuration
PyPI : Publish to PyPI with clear naming (sqlalchemy-*)
Versioning : Use semantic versioning
License : Choose an appropriate open-source license
Example: Minimal Working Dialect
sqlalchemy_mydialect/mydriver.py
from sqlalchemy.engine import default
from sqlalchemy import types as sqltypes
class MyDialect ( default . DefaultDialect ):
name = 'mydialect'
driver = 'mydriver'
default_paramstyle = 'qmark'
supports_native_boolean = True
@ classmethod
def dbapi ( cls ):
import mydriver
return mydriver
def create_connect_args ( self , url ):
opts = url.translate_connect_args()
return ([], opts)
dialect = MyDialect
Usage:
from sqlalchemy import create_engine
engine = create_engine( "mydialect+mydriver://user:pass@host/db" )
See Also
Type Migration Guidelines Rules for implementing dialect types
Dialect Overview Understanding the dialect system