Skip to main content

Overview

The SQL Server dialect provides comprehensive support for Microsoft SQL Server, including support for IDENTITY columns, SQL Server-specific types, and various connection methods.
Version Support
  • Normal support: SQL Server 2012+
  • Best effort: SQL Server 2005+

Installation

pip install pyodbc
Requires ODBC driver installation on your system.

ODBC Driver Installation

For pyodbc, you need an ODBC driver:

Connecting to SQL Server

DSN Connection

from sqlalchemy import create_engine

# Using a configured DSN
engine = create_engine("mssql+pyodbc://scott:tiger@my_dsn")

# DSN without credentials (uses Windows Auth)
engine = create_engine("mssql+pyodbc://my_dsn")

Hostname Connection

from sqlalchemy import create_engine

engine = create_engine(
    "mssql+pyodbc://user:pass@hostname:1433/database?"
    "driver=ODBC+Driver+18+for+SQL+Server"
)

Using URL.create()

from sqlalchemy.engine import URL

connection_url = URL.create(
    "mssql+pyodbc",
    username="user",
    password="pass",
    host="hostname",
    port=1433,
    database="mydb",
    query={
        "driver": "ODBC Driver 18 for SQL Server",
        "TrustServerCertificate": "yes",
        "Encrypt": "yes"
    }
)

engine = create_engine(connection_url)

pymssql Connection

engine = create_engine(
    "mssql+pymssql://user:pass@hostname:1433/database"
)

Async Connection

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "mssql+aioodbc://user:pass@hostname/database?"
    "driver=ODBC+Driver+18+for+SQL+Server"
)

IDENTITY Columns

SQL Server uses IDENTITY for auto-increment:
from sqlalchemy import Table, Column, Integer, String

# Automatic IDENTITY on first integer primary key
users = Table(
    't',
    metadata,
    Column('id', Integer, primary_key=True),  # Gets IDENTITY
    Column('x', Integer)
)
Generates:
CREATE TABLE t (
    id INTEGER NOT NULL IDENTITY,
    x INTEGER NULL,
    PRIMARY KEY (id)
)

Controlling IDENTITY

users = Table(
    't',
    metadata,
    Column('id', Integer, primary_key=True, autoincrement=False),
    Column('x', Integer)
)

SQL Server-Specific Types

String Types

from sqlalchemy.dialects.mssql import (
    VARCHAR,       # Variable-length non-Unicode
    NVARCHAR,      # Variable-length Unicode
    CHAR,          # Fixed-length non-Unicode
    NCHAR,         # Fixed-length Unicode
    TEXT,          # Large non-Unicode text (deprecated)
    NTEXT          # Large Unicode text (deprecated)
)

users = Table(
    'users',
    metadata,
    Column('username', VARCHAR(50)),
    Column('display_name', NVARCHAR(100)),  # Use for Unicode
    Column('code', CHAR(10))
)
Use NVARCHAR for Unicode text. VARCHAR stores non-Unicode data and may lose characters.

Binary Types

from sqlalchemy.dialects.mssql import (
    BINARY,        # Fixed-length binary
    VARBINARY,     # Variable-length binary
    IMAGE          # Large binary (deprecated, use VARBINARY(MAX))
)

files = Table(
    'files',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('hash', BINARY(32)),
    Column('data', VARBINARY)  # or VARBINARY(max)
)

Numeric Types

from sqlalchemy.dialects.mssql import (
    TINYINT,       # 0 to 255
    SMALLINT,      # -32,768 to 32,767
    INTEGER,       # -2^31 to 2^31-1
    BIGINT,        # -2^63 to 2^63-1
    DECIMAL,       # Fixed precision
    NUMERIC,       # Fixed precision
    FLOAT,         # Floating point
    REAL,          # Single precision float
    MONEY,         # Currency
    SMALLMONEY     # Small currency
)

products = Table(
    'products',
    metadata,
    Column('id', INTEGER, primary_key=True),
    Column('price', MONEY),
    Column('quantity', SMALLINT),
    Column('weight', DECIMAL(10, 2))
)

Date and Time Types

from sqlalchemy.dialects.mssql import (
    DATE,              # Date only (SQL Server 2008+)
    TIME,              # Time only (SQL Server 2008+)
    DATETIME,          # Date and time (legacy)
    DATETIME2,         # Date and time (SQL Server 2008+, preferred)
    SMALLDATETIME,     # Date and time (less precision)
    DATETIMEOFFSET     # Date and time with timezone
)

events = Table(
    'events',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('created_legacy', DATETIME),
    Column('created', DATETIME2),  # Preferred
    Column('created_tz', DATETIMEOFFSET),
    Column('event_date', DATE),
    Column('event_time', TIME)
)
Use DATETIME2 over DATETIME for better precision and range (0001-01-01 to 9999-12-31).

Other SQL Server Types

from sqlalchemy.dialects.mssql import (
    BIT,               # Boolean (0 or 1)
    UNIQUEIDENTIFIER,  # GUID/UUID
    XML,               # XML data
    JSON,              # JSON (SQL Server 2016+)
    ROWVERSION,        # Row version for concurrency
    TIMESTAMP,         # Alias for ROWVERSION
    SQL_VARIANT        # Variant type
)

mixed = Table(
    'mixed',
    metadata,
    Column('id', UNIQUEIDENTIFIER, primary_key=True, default=text('NEWID()')),
    Column('is_active', BIT),
    Column('data', JSON),
    Column('xml_data', XML),
    Column('version', ROWVERSION)
)

Working with JSON

SQL Server 2016+ supports JSON:
from sqlalchemy.dialects.mssql import JSON
from sqlalchemy import func

products = Table(
    'products',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('data', JSON)
)

# Query JSON data
stmt = select(products).where(
    func.json_value(products.c.data, '$.price').cast(Integer) > 100
)

Working with XML

from sqlalchemy.dialects.mssql import XML
from sqlalchemy import text

documents = Table(
    'documents',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('content', XML)
)

# Insert XML
conn.execute(
    documents.insert(),
    {'content': '<root><item>value</item></root>'}
)

# Query XML with XQuery
stmt = text(
    "SELECT id FROM documents WHERE content.exist('/root/item') = 1"
)

MERGE Statement

SQL Server’s UPSERT operation:
from sqlalchemy import text

merge_stmt = text("""
    MERGE INTO users AS target
    USING (VALUES (:id, :name, :email)) AS source (id, name, email)
    ON target.id = source.id
    WHEN MATCHED THEN
        UPDATE SET name = source.name, email = source.email
    WHEN NOT MATCHED THEN
        INSERT (id, name, email) VALUES (source.id, source.name, source.email);
""")

connection.execute(
    merge_stmt,
    {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
)

OUTPUT Clause

Retrieve inserted/updated/deleted values:
from sqlalchemy import insert, text

# Using raw SQL
stmt = text("""
    INSERT INTO users (name, email)
    OUTPUT INSERTED.id, INSERTED.name
    VALUES (:name, :email)
""")

result = connection.execute(stmt, {'name': 'Alice', 'email': 'alice@example.com'})
inserted = result.fetchone()

TOP Clause

from sqlalchemy import select

# LIMIT is translated to TOP
stmt = select(users).limit(10)
# Generates: SELECT TOP 10 * FROM users

# With OFFSET (SQL Server 2012+)
stmt = select(users).limit(10).offset(20)
# Generates: SELECT * FROM users ORDER BY ... OFFSET 20 ROWS FETCH NEXT 10 ROWS ONLY

Window Functions

from sqlalchemy import func, over

# ROW_NUMBER()
row_num = func.row_number().over(
    partition_by=users.c.department,
    order_by=users.c.salary.desc()
).label('row_num')

stmt = select(users, row_num)

# RANK(), DENSE_RANK(), etc.
rank = func.rank().over(order_by=users.c.score.desc())

Common Table Expressions (CTEs)

from sqlalchemy import select

# Recursive CTE for hierarchical data
employees_cte = select(
    employees.c.id,
    employees.c.manager_id,
    employees.c.name,
    literal(0).label('level')
).where(
    employees.c.manager_id.is_(None)
).cte('employees_cte', recursive=True)

recursive_part = select(
    employees.c.id,
    employees.c.manager_id,
    employees.c.name,
    (employees_cte.c.level + 1).label('level')
).join(
    employees_cte,
    employees.c.manager_id == employees_cte.c.id
)

employees_cte = employees_cte.union_all(recursive_part)

stmt = select(employees_cte)

TRY_CAST Function

from sqlalchemy import try_cast

# Safely cast values, returns NULL on failure
stmt = select(
    try_cast(users.c.age_string, Integer)
).where(
    try_cast(users.c.age_string, Integer) > 18
)

Schema Specification

# Specify schema (database owner)
users = Table(
    'users',
    metadata,
    Column('id', Integer, primary_key=True),
    schema='dbo'  # Default schema
)

# Cross-database queries
remote_table = Table(
    'products',
    metadata,
    schema='OtherDatabase.dbo'
)

Transaction Isolation Levels

from sqlalchemy import create_engine

engine = create_engine(
    "mssql+pyodbc://...",
    isolation_level="READ COMMITTED"
)

# Or per-connection
with engine.connect().execution_options(
    isolation_level="SERIALIZABLE"
) as conn:
    # Execute queries
    pass
Available levels:
  • READ UNCOMMITTED
  • READ COMMITTED (default)
  • REPEATABLE READ
  • SERIALIZABLE
  • SNAPSHOT

Performance Tips

  1. Use NOLOCK hint for read-heavy workloads (with caution):
    stmt = select(users).with_hint(users, "WITH (NOLOCK)")
    
  2. Enable connection pooling:
    engine = create_engine(
        "mssql+pyodbc://...",
        pool_size=20,
        max_overflow=40
    )
    
  3. Use DATETIME2 over DATETIME for better performance
  4. Create proper indexes on frequently queried columns
  5. Use query hints when needed:
    stmt = select(users).with_hint(users, "WITH (INDEX(idx_name))")
    

Common Issues

Unicode Strings

Always use NVARCHAR for Unicode:
# Wrong - may lose data
Column('name', VARCHAR(50))

# Correct
Column('name', NVARCHAR(50))

Fast Executemany

Enable fast executemany for bulk inserts:
engine = create_engine(
    "mssql+pyodbc://...",
    fast_executemany=True
)

Connection Timeout

engine = create_engine(
    "mssql+pyodbc://...?timeout=30"
)

See Also

Dialect Overview

Learn about SQLAlchemy’s dialect system

SQL Server Documentation

Official SQL Server documentation