Overview
This page documents the core methods of the Session class for managing objects and executing queries.
Object Management Methods
add()
Add an object to the session.
session.add(instance: object, *, _warn: bool = True) -> None
The mapped object instance to add to the session.
Internal parameter for warning control.
Behavior
- Transient objects move to pending state
- Detached objects move to persistent state
- Already attached objects are ignored
Example
from sqlalchemy.orm import Session
session = Session(engine)
# Add a new object (transient → pending)
user = User(name="Alice", email="alice@example.com")
session.add(user)
# After flush/commit → persistent
session.flush()
print(user.id) # Auto-generated ID now available
Changes cascade to related objects based on cascade relationship settings. Default cascade includes “save-update”.
add_all()
Add multiple objects to the session at once.
session.add_all(instances: Iterable[object]) -> None
Collection of mapped instances to add.
Example
users = [
User(name="Alice"),
User(name="Bob"),
User(name="Charlie"),
]
session.add_all(users)
session.commit()
delete()
Mark an instance for deletion.
session.delete(instance: object) -> None
The persistent or detached instance to delete.
Behavior
- Object must be persistent or detached with a primary key
- Moves to deleted state after flush
- Becomes detached after commit
- DELETE statement issued during flush
Example
user = session.get(User, 1)
session.delete(user)
# Object still in session until flush
print(user in session.deleted) # True
session.flush()
# DELETE statement executed
session.commit()
# user is now detached
Deleting an object does not cascade by default. Configure cascade="all, delete-orphan" on relationships to enable cascade deletion.
delete_all()
Delete multiple instances.
session.delete_all(instances: Iterable[object]) -> None
Collection of instances to delete.
Example
users_to_delete = session.execute(
select(User).where(User.inactive == True)
).scalars().all()
session.delete_all(users_to_delete)
session.commit()
expunge()
Remove an instance from the session without deleting it.
session.expunge(instance: object) -> None
The instance to remove from the session.
Behavior
- Removes object from session’s identity map
- Object becomes detached or transient
- Cascades according to “expunge” cascade rule
- Does not affect database
Example
user = session.get(User, 1)
# Remove from session
session.expunge(user)
# user is now detached
print(user in session) # False
# Modifications won't be tracked
user.name = "Modified"
session.commit() # Change not saved
merge()
Copy the state of an instance into the session.
session.merge(
instance: _O,
*,
load: bool = True,
options: Optional[Sequence[ORMOption]] = None,
) -> _O
Whether to load the object from database if not in session.
Loader options to apply when loading the existing instance.
Behavior
Check Identity Map
Looks for instance with same primary key in session.
Load from Database
If not found and load=True, queries database by primary key.
Create New Instance
If still not found, creates a new instance.
Copy State
Copies attribute values from source to target instance.
Return Target
Returns the merged (target) instance, leaving source unchanged.
Example
# Detached object from another session
detached_user = User(id=1, name="Updated Name")
# Merge into current session
with Session(engine) as session:
merged_user = session.merge(detached_user)
# merged_user is in this session
# detached_user remains unchanged
print(merged_user in session) # True
print(detached_user in session) # False
session.commit()
# Skip database load for performance
user = User(id=1, name="Alice", email="alice@example.com")
merged = session.merge(user, load=False)
# No SELECT issued, object added directly
session.commit()
With load=False, the object must be “clean” (no pending changes) and have a complete primary key.
merge_all()
Merge multiple instances.
session.merge_all(
instances: Iterable[_O],
*,
load: bool = True,
options: Optional[Sequence[ORMOption]] = None,
) -> Sequence[_O]
Query and Execution Methods
execute()
Execute a SQL statement and return results.
session.execute(
statement: Executable,
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[OrmExecuteOptionsParameter] = None,
bind_arguments: Optional[Dict[str, Any]] = None,
**kw: Any,
) -> Result
The SQL statement to execute (select, insert, update, delete).
Parameters to bind to the statement.
Execution options like synchronize_session, populate_existing.
Arguments to determine which engine/connection to use.
Example: SELECT Statement
from sqlalchemy import select
# Execute a select statement
stmt = select(User).where(User.name == "Alice")
result = session.execute(stmt)
# Get results
users = result.scalars().all()
for user in users:
print(user.name)
Example: With Parameters
stmt = select(User).where(User.id == :user_id)
result = session.execute(stmt, {"user_id": 1})
user = result.scalar_one()
Example: UPDATE Statement
from sqlalchemy import update
stmt = update(User).where(User.id == 1).values(name="Updated")
result = session.execute(stmt)
print(result.rowcount) # Number of rows updated
session.commit()
scalars()
Execute a statement and return scalar results.
result = session.scalars(select(User).where(User.active == True))
users = result.all()
Equivalent to:
result = session.execute(select(User).where(User.active == True))
users = result.scalars().all()
query()
Create a Query object for the given entities.
session.query(*entities, **kwargs) -> Query
The query() API is in “legacy” status. Use select() with execute() or scalars() for new code.
Example
# Legacy Query API
users = session.query(User).filter(User.name.like("A%")).all()
# Modern approach (preferred)
from sqlalchemy import select
users = session.scalars(select(User).where(User.name.like("A%"))).all()
Data Retrieval Methods
get()
Retrieve an object by primary key.
session.get(
entity: Type[_O],
ident: Union[Any, Tuple[Any, ...]],
*,
options: Optional[Sequence[ORMOption]] = None,
populate_existing: bool = False,
with_for_update: Optional[ForUpdateParameter] = None,
identity_token: Optional[Any] = None,
execution_options: Optional[OrmExecuteOptionsParameter] = None,
) -> Optional[_O]
The mapped class to retrieve.
ident
Any | Tuple[Any, ...]
required
Primary key value(s).
Loader options like joinedload(), selectinload().
Refresh the object even if already in session.
Enable SELECT…FOR UPDATE locking.
Example
# Simple primary key
user = session.get(User, 1)
if user:
print(user.name)
# Composite primary key
record = session.get(CompositeKeyTable, (1, 2))
# With eager loading
from sqlalchemy.orm import joinedload
user = session.get(User, 1, options=[joinedload(User.addresses)])
refresh()
Expire and immediately reload an object’s attributes.
session.refresh(
instance: object,
attribute_names: Optional[Iterable[str]] = None,
*,
with_for_update: Optional[ForUpdateParameter] = None,
) -> None
The persistent instance to refresh.
Specific attributes to refresh. If None, refreshes all.
Example
user = session.get(User, 1)
print(user.name) # "Alice"
# Another process updates the database
# ...
# Refresh to get latest data
session.refresh(user)
print(user.name) # "Alice Updated" (if changed in DB)
# Refresh only specific attributes
session.refresh(user, ["email"])
Transaction Control Methods
commit()
Flush changes and commit the transaction.
See Session Basics - Commit Operations for details.
rollback()
Rollback the current transaction.
session.rollback() -> None
See Session Basics - Rollback Operations for details.
flush()
Synchronize changes to the database without committing.
session.flush(objects: Optional[Sequence[Any]] = None) -> None
Deprecated. Restricts flush to specific objects.
Example
user = User(name="Alice")
session.add(user)
# Flush to database (INSERT issued)
session.flush()
# ID is now available
print(user.id) # 1
# But can still rollback
session.rollback()
begin()
Begin a transaction or nested transaction.
session.begin(nested: bool = False) -> SessionTransaction
begin_nested()
Begin a SAVEPOINT transaction.
session.begin_nested() -> SessionTransaction
close()
Close the session and release resources.
Expiration and Refresh Methods
expire()
Mark instance attributes as stale, to be reloaded on next access.
session.expire(
instance: object,
attribute_names: Optional[Iterable[str]] = None,
) -> None
Specific attributes to expire. If None, expires all.
Example
user = session.get(User, 1)
print(user.name) # "Alice" (loaded)
# Mark as stale
session.expire(user)
# Next access triggers reload
print(user.name) # SELECT issued to reload
# Expire specific attributes
session.expire(user, ["email", "created_at"])
Accessing expired attributes on detached objects (after session.close()) will raise an error.
expire_all()
Expire all instances in the session.
session.expire_all() -> None
Example
# After bulk external changes
some_external_process_modifies_database()
# Expire everything to force reload
session.expire_all()
# All subsequent attribute access will reload from DB
Identity Map Methods
get_bind()
Determine the engine/connection for a given entity or clause.
session.get_bind(
mapper: Optional[Mapper] = None,
clause: Optional[ClauseElement] = None,
bind: Optional[_SessionBind] = None,
**kw: Any,
) -> Union[Engine, Connection]
Example
# Get bind for User entity
engine = session.get_bind(mapper=User.__mapper__)
# Get bind for specific table
engine = session.get_bind(clause=user_table)
identity_map
Access the session’s identity map.
# Iterate over all objects in session
for obj in session.identity_map.values():
print(obj)
# Check if specific identity exists
from sqlalchemy.orm import identity_key
key = identity_key(User, [1])
if key in session.identity_map:
print("User 1 is in session")
identity_key()
Generate an identity key for an instance.
key = session.identity_key(
class_=User,
ident=1,
)
print(key) # (User, (1,), None)
# From instance
user = User(id=1, name="Alice")
key = session.identity_key(instance=user)
Inspection Methods
is_modified()
Check if an instance has pending changes.
is_modified = session.is_modified(
instance: object,
include_collections: bool = True,
) -> bool
Whether to check collection modifications.
Example
user = session.get(User, 1)
print(session.is_modified(user)) # False
user.name = "Modified"
print(session.is_modified(user)) # True
user.addresses.append(Address(email="new@example.com"))
print(session.is_modified(user, include_collections=True)) # True
in_transaction()
Check if session is in a transaction.
if session.in_transaction():
print("Transaction active")
in_nested_transaction()
Check if session is in a nested transaction (SAVEPOINT).
if session.in_nested_transaction():
print("In SAVEPOINT")
Session State Collections
new
Set of pending (new) objects.
user = User(name="Alice")
session.add(user)
print(user in session.new) # True
session.flush()
print(user in session.new) # False (now persistent)
dirty
Set of modified objects.
user = session.get(User, 1)
user.name = "Modified"
print(user in session.dirty) # True
session.commit()
print(user in session.dirty) # False
deleted
Set of objects marked for deletion.
user = session.get(User, 1)
session.delete(user)
print(user in session.deleted) # True
session.commit()
print(user in session.deleted) # False (now detached)
Utility Methods
no_autoflush
Context manager to temporarily disable autoflush.
with session.no_autoflush:
# Queries won't trigger flush
user = session.get(User, 1)
user.name = "Modified"
# This won't flush the change
other_users = session.scalars(select(User)).all()
# Autoflush resumes
enable_relationship_loading()
Enable lazy loading for a detached instance.
user = session.get(User, 1)
session.close()
# user is now detached
# Enable loading for this specific instance
user = session.enable_relationship_loading(user)
# Can now lazy load relationships
addresses = user.addresses # SELECT issued
Bulk Operations
bulk_save_objects()
Bulk insert/update without full ORM overhead.
session.bulk_save_objects(
objects: Iterable[object],
return_defaults: bool = False,
update_changed_only: bool = True,
) -> None
Example
users = [User(name=f"User{i}") for i in range(1000)]
# Fast bulk insert
session.bulk_save_objects(users)
session.commit()
Bulk operations bypass many ORM features: events, relationship cascades, and the unit of work pattern. Use only when necessary for performance.
bulk_insert_mappings()
Bulk insert from dictionaries.
mappings = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
]
session.bulk_insert_mappings(User, mappings)
session.commit()
bulk_update_mappings()
Bulk update from dictionaries.
mappings = [
{"id": 1, "name": "Alice Updated"},
{"id": 2, "name": "Bob Updated"},
]
session.bulk_update_mappings(User, mappings)
session.commit()
See Also