Overview
The SQL Server dialect provides comprehensive support for Microsoft SQL Server, including support for IDENTITY columns, SQL Server-specific types, and various connection methods.
Version Support
Normal support: SQL Server 2012+
Best effort: SQL Server 2005+
Installation
pyodbc (Default)
pymssql
aioodbc (Async)
Requires ODBC driver installation on your system. Uses FreeTDS, no ODBC required. Async support for SQL Server.
ODBC Driver Installation
For pyodbc, you need an ODBC driver:
Windows
Linux (Debian/Ubuntu)
macOS
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/ $( lsb_release -rs ) /prod.list | \
sudo tee /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18
brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
brew update
brew install msodbcsql18 mssql-tools18
Connecting to SQL Server
DSN Connection
from sqlalchemy import create_engine
# Using a configured DSN
engine = create_engine( "mssql+pyodbc://scott:tiger@my_dsn" )
# DSN without credentials (uses Windows Auth)
engine = create_engine( "mssql+pyodbc://my_dsn" )
Hostname Connection
pyodbc
With Additional Parameters
Windows Authentication
Azure Active Directory
from sqlalchemy import create_engine
engine = create_engine(
"mssql+pyodbc://user:pass@hostname:1433/database?"
"driver=ODBC+Driver+18+for+SQL+Server"
)
Using URL.create()
from sqlalchemy.engine import URL
connection_url = URL .create(
"mssql+pyodbc" ,
username = "user" ,
password = "pass" ,
host = "hostname" ,
port = 1433 ,
database = "mydb" ,
query = {
"driver" : "ODBC Driver 18 for SQL Server" ,
"TrustServerCertificate" : "yes" ,
"Encrypt" : "yes"
}
)
engine = create_engine(connection_url)
pymssql Connection
engine = create_engine(
"mssql+pymssql://user:pass@hostname:1433/database"
)
Async Connection
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"mssql+aioodbc://user:pass@hostname/database?"
"driver=ODBC+Driver+18+for+SQL+Server"
)
IDENTITY Columns
SQL Server uses IDENTITY for auto-increment:
from sqlalchemy import Table, Column, Integer, String
# Automatic IDENTITY on first integer primary key
users = Table(
't' ,
metadata,
Column( 'id' , Integer, primary_key = True ), # Gets IDENTITY
Column( 'x' , Integer)
)
Generates:
CREATE TABLE t (
id INTEGER NOT NULL IDENTITY ,
x INTEGER NULL ,
PRIMARY KEY (id)
)
Controlling IDENTITY
users = Table(
't' ,
metadata,
Column( 'id' , Integer, primary_key = True , autoincrement = False ),
Column( 'x' , Integer)
)
users = Table(
't' ,
metadata,
Column( 'id' , Integer, primary_key = True , autoincrement = False ),
Column( 'x' , Integer, autoincrement = True ) # IDENTITY here
)
from sqlalchemy import Identity
users = Table(
't' ,
metadata,
Column(
'id' ,
Integer,
Identity( start = 100 , increment = 10 ),
primary_key = True
)
)
SQL Server-Specific Types
String Types
from sqlalchemy.dialects.mssql import (
VARCHAR , # Variable-length non-Unicode
NVARCHAR , # Variable-length Unicode
CHAR , # Fixed-length non-Unicode
NCHAR , # Fixed-length Unicode
TEXT , # Large non-Unicode text (deprecated)
NTEXT # Large Unicode text (deprecated)
)
users = Table(
'users' ,
metadata,
Column( 'username' , VARCHAR( 50 )),
Column( 'display_name' , NVARCHAR( 100 )), # Use for Unicode
Column( 'code' , CHAR( 10 ))
)
Use NVARCHAR for Unicode text. VARCHAR stores non-Unicode data and may lose characters.
Binary Types
from sqlalchemy.dialects.mssql import (
BINARY , # Fixed-length binary
VARBINARY , # Variable-length binary
IMAGE # Large binary (deprecated, use VARBINARY(MAX))
)
files = Table(
'files' ,
metadata,
Column( 'id' , Integer, primary_key = True ),
Column( 'hash' , BINARY( 32 )),
Column( 'data' , VARBINARY ) # or VARBINARY(max)
)
Numeric Types
from sqlalchemy.dialects.mssql import (
TINYINT , # 0 to 255
SMALLINT , # -32,768 to 32,767
INTEGER , # -2^31 to 2^31-1
BIGINT , # -2^63 to 2^63-1
DECIMAL , # Fixed precision
NUMERIC , # Fixed precision
FLOAT , # Floating point
REAL , # Single precision float
MONEY , # Currency
SMALLMONEY # Small currency
)
products = Table(
'products' ,
metadata,
Column( 'id' , INTEGER , primary_key = True ),
Column( 'price' , MONEY ),
Column( 'quantity' , SMALLINT ),
Column( 'weight' , DECIMAL( 10 , 2 ))
)
Date and Time Types
from sqlalchemy.dialects.mssql import (
DATE , # Date only (SQL Server 2008+)
TIME , # Time only (SQL Server 2008+)
DATETIME , # Date and time (legacy)
DATETIME2 , # Date and time (SQL Server 2008+, preferred)
SMALLDATETIME , # Date and time (less precision)
DATETIMEOFFSET # Date and time with timezone
)
events = Table(
'events' ,
metadata,
Column( 'id' , Integer, primary_key = True ),
Column( 'created_legacy' , DATETIME ),
Column( 'created' , DATETIME2 ), # Preferred
Column( 'created_tz' , DATETIMEOFFSET ),
Column( 'event_date' , DATE ),
Column( 'event_time' , TIME )
)
Use DATETIME2 over DATETIME for better precision and range (0001-01-01 to 9999-12-31).
Other SQL Server Types
from sqlalchemy.dialects.mssql import (
BIT , # Boolean (0 or 1)
UNIQUEIDENTIFIER , # GUID/UUID
XML , # XML data
JSON , # JSON (SQL Server 2016+)
ROWVERSION , # Row version for concurrency
TIMESTAMP , # Alias for ROWVERSION
SQL_VARIANT # Variant type
)
mixed = Table(
'mixed' ,
metadata,
Column( 'id' , UNIQUEIDENTIFIER , primary_key = True , default = text( 'NEWID()' )),
Column( 'is_active' , BIT ),
Column( 'data' , JSON ),
Column( 'xml_data' , XML ),
Column( 'version' , ROWVERSION )
)
Working with JSON
SQL Server 2016+ supports JSON:
from sqlalchemy.dialects.mssql import JSON
from sqlalchemy import func
products = Table(
'products' ,
metadata,
Column( 'id' , Integer, primary_key = True ),
Column( 'data' , JSON )
)
# Query JSON data
stmt = select(products).where(
func.json_value(products.c.data, '$.price' ).cast(Integer) > 100
)
Working with XML
from sqlalchemy.dialects.mssql import XML
from sqlalchemy import text
documents = Table(
'documents' ,
metadata,
Column( 'id' , Integer, primary_key = True ),
Column( 'content' , XML )
)
# Insert XML
conn.execute(
documents.insert(),
{ 'content' : '<root><item>value</item></root>' }
)
# Query XML with XQuery
stmt = text(
"SELECT id FROM documents WHERE content.exist('/root/item') = 1"
)
MERGE Statement
SQL Server’s UPSERT operation:
from sqlalchemy import text
merge_stmt = text( """
MERGE INTO users AS target
USING (VALUES (:id, :name, :email)) AS source (id, name, email)
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET name = source.name, email = source.email
WHEN NOT MATCHED THEN
INSERT (id, name, email) VALUES (source.id, source.name, source.email);
""" )
connection.execute(
merge_stmt,
{ 'id' : 1 , 'name' : 'Alice' , 'email' : 'alice@example.com' }
)
OUTPUT Clause
Retrieve inserted/updated/deleted values:
from sqlalchemy import insert, text
# Using raw SQL
stmt = text( """
INSERT INTO users (name, email)
OUTPUT INSERTED.id, INSERTED.name
VALUES (:name, :email)
""" )
result = connection.execute(stmt, { 'name' : 'Alice' , 'email' : 'alice@example.com' })
inserted = result.fetchone()
TOP Clause
from sqlalchemy import select
# LIMIT is translated to TOP
stmt = select(users).limit( 10 )
# Generates: SELECT TOP 10 * FROM users
# With OFFSET (SQL Server 2012+)
stmt = select(users).limit( 10 ).offset( 20 )
# Generates: SELECT * FROM users ORDER BY ... OFFSET 20 ROWS FETCH NEXT 10 ROWS ONLY
Window Functions
from sqlalchemy import func, over
# ROW_NUMBER()
row_num = func.row_number().over(
partition_by = users.c.department,
order_by = users.c.salary.desc()
).label( 'row_num' )
stmt = select(users, row_num)
# RANK(), DENSE_RANK(), etc.
rank = func.rank().over( order_by = users.c.score.desc())
Common Table Expressions (CTEs)
from sqlalchemy import select
# Recursive CTE for hierarchical data
employees_cte = select(
employees.c.id,
employees.c.manager_id,
employees.c.name,
literal( 0 ).label( 'level' )
).where(
employees.c.manager_id.is_( None )
).cte( 'employees_cte' , recursive = True )
recursive_part = select(
employees.c.id,
employees.c.manager_id,
employees.c.name,
(employees_cte.c.level + 1 ).label( 'level' )
).join(
employees_cte,
employees.c.manager_id == employees_cte.c.id
)
employees_cte = employees_cte.union_all(recursive_part)
stmt = select(employees_cte)
TRY_CAST Function
from sqlalchemy import try_cast
# Safely cast values, returns NULL on failure
stmt = select(
try_cast(users.c.age_string, Integer)
).where(
try_cast(users.c.age_string, Integer) > 18
)
Schema Specification
# Specify schema (database owner)
users = Table(
'users' ,
metadata,
Column( 'id' , Integer, primary_key = True ),
schema = 'dbo' # Default schema
)
# Cross-database queries
remote_table = Table(
'products' ,
metadata,
schema = 'OtherDatabase.dbo'
)
Transaction Isolation Levels
from sqlalchemy import create_engine
engine = create_engine(
"mssql+pyodbc://..." ,
isolation_level = "READ COMMITTED"
)
# Or per-connection
with engine.connect().execution_options(
isolation_level = "SERIALIZABLE"
) as conn:
# Execute queries
pass
Available levels:
READ UNCOMMITTED
READ COMMITTED (default)
REPEATABLE READ
SERIALIZABLE
SNAPSHOT
Use NOLOCK hint for read-heavy workloads (with caution):
stmt = select(users).with_hint(users, "WITH (NOLOCK)" )
Enable connection pooling :
engine = create_engine(
"mssql+pyodbc://..." ,
pool_size = 20 ,
max_overflow = 40
)
Use DATETIME2 over DATETIME for better performance
Create proper indexes on frequently queried columns
Use query hints when needed:
stmt = select(users).with_hint(users, "WITH (INDEX(idx_name))" )
Common Issues
Unicode Strings
Always use NVARCHAR for Unicode:
# Wrong - may lose data
Column( 'name' , VARCHAR( 50 ))
# Correct
Column( 'name' , NVARCHAR( 50 ))
Fast Executemany
Enable fast executemany for bulk inserts:
engine = create_engine(
"mssql+pyodbc://..." ,
fast_executemany = True
)
Connection Timeout
engine = create_engine(
"mssql+pyodbc://...?timeout=30"
)
See Also
Dialect Overview Learn about SQLAlchemy’s dialect system
SQL Server Documentation Official SQL Server documentation