The StateManager provides persistent key-value storage for workflow state. Each value is stored within a scope (group) and identified by a key. State is automatically persisted and available across step executions.
Accessing state
Access the state manager through the flow context:
from motia import FlowContext
async def handler(input, ctx: FlowContext) -> None:
# Access via context
await ctx.state.get("users", "user-123")
Or import the global instance:
from motia import stateManager
# Use global instance (requires III initialization)
await stateManager.get("users", "user-123")
Methods
get()
Retrieve a value from state.
async def get(self, scope: str, key: str) -> Any | None
The state scope/group to read from.
The key within the scope to retrieve.
The stored value, or None if the key doesn’t exist.
Example
async def handler(input, ctx: FlowContext) -> None:
user = await ctx.state.get("users", "user-123")
if user:
ctx.logger.info(f"Found user: {user['name']}")
else:
ctx.logger.info("User not found")
set()
Store a value in state.
async def set(self, scope: str, key: str, value: Any) -> Any
The state scope/group to write to.
The key within the scope to set.
The value to store. Must be JSON-serializable.
The stored value (echoed back).
Example
async def handler(input, ctx: FlowContext) -> None:
await ctx.state.set("users", "user-123", {
"id": "user-123",
"name": "Alice",
"email": "alice@example.com",
"created_at": "2024-01-15T10:30:00Z",
})
ctx.logger.info("User created")
update()
Update a value using atomic update operations.
async def update(self, scope: str, key: str, ops: list[dict[str, Any]]) -> Any
The state scope/group containing the value.
The key within the scope to update.
List of update operations to apply. Each operation is a dict with op, path, and optionally value.
Update operations
Supported operations follow JSON Patch (RFC 6902) semantics:
{"op": "add", "path": "/field", "value": ...} - Add or replace a field
{"op": "remove", "path": "/field"} - Remove a field
{"op": "replace", "path": "/field", "value": ...} - Replace a field value
{"op": "increment", "path": "/count", "value": 1} - Increment a numeric field
Example
async def handler(input, ctx: FlowContext) -> None:
# Increment a counter
await ctx.state.update("users", "user-123", [
{"op": "increment", "path": "/login_count", "value": 1},
{"op": "replace", "path": "/last_login", "value": "2024-01-15T14:30:00Z"},
])
# Add a new field
await ctx.state.update("users", "user-123", [
{"op": "add", "path": "/verified", "value": True},
])
# Remove a field
await ctx.state.update("users", "user-123", [
{"op": "remove", "path": "/temp_token"},
])
delete()
Remove a value from state.
async def delete(self, scope: str, key: str) -> Any | None
The state scope/group to delete from.
The key within the scope to delete.
The deleted value, or None if the key didn’t exist.
Example
async def handler(input, ctx: FlowContext) -> None:
deleted_user = await ctx.state.delete("users", "user-123")
if deleted_user:
ctx.logger.info(f"Deleted user: {deleted_user['name']}")
else:
ctx.logger.info("User not found")
list()
List all values within a scope.
async def list(self, scope: str) -> list[Any]
The state scope/group to list.
List of all values in the scope. Each value typically includes an id field.
Example
async def handler(input, ctx: FlowContext) -> None:
all_users = await ctx.state.list("users")
ctx.logger.info(f"Found {len(all_users)} users")
for user in all_users:
ctx.logger.info(f"User: {user['id']} - {user.get('name')}")
list_groups()
List all scope IDs that contain state.
async def list_groups(self) -> list[str]
List of all scope names that have been created.
Example
async def handler(input, ctx: FlowContext) -> None:
scopes = await ctx.state.list_groups()
ctx.logger.info(f"State scopes: {scopes}")
# Example output: ["users", "orders", "sessions"]
# List all items across all scopes
for scope in scopes:
items = await ctx.state.list(scope)
ctx.logger.info(f"Scope '{scope}': {len(items)} items")
clear()
Delete all values within a scope.
async def clear(self, scope: str) -> None
The state scope/group to clear.
Example
async def handler(input, ctx: FlowContext) -> None:
# Clear all temporary session data
await ctx.state.clear("sessions")
ctx.logger.info("All sessions cleared")
State triggers
Steps can be triggered when state changes occur. Use the state() trigger factory:
from motia import state, step, FlowContext, StateTriggerInput
def only_user_scope(input: StateTriggerInput, ctx: FlowContext) -> bool:
return input.group_id == "users"
my_step = step({
"name": "OnUserChange",
"triggers": [
state(condition=only_user_scope),
],
})
async def handler(input: StateTriggerInput, ctx: FlowContext) -> None:
ctx.logger.info(
f"User {input.item_id} changed",
{
"old_value": input.old_value,
"new_value": input.new_value,
},
)
See Triggers for more information.
Complete example
from motia import step, http, state, FlowContext, ApiRequest, ApiResponse, StateTriggerInput
# Step 1: Create user via API
user_api = step({
"name": "CreateUser",
"triggers": [http("POST", "/users")],
})
async def create_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
body = request.body or {}
user_id = f"user-{ctx.trace_id[:8]}"
# Create user in state
user = {
"id": user_id,
"name": body.get("name"),
"email": body.get("email"),
"login_count": 0,
"created_at": ctx.trace_id,
}
await ctx.state.set("users", user_id, user)
ctx.logger.info("User created", {"user_id": user_id})
return ApiResponse(status=201, body=user)
user_api.handle(create_handler)
# Step 2: Update user login count
login_step = step({
"name": "RecordLogin",
"triggers": [http("POST", "/users/:id/login")],
})
async def login_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
user_id = request.path_params.get("id")
if not user_id:
return ApiResponse(status=400, body={"error": "User ID required"})
# Check if user exists
user = await ctx.state.get("users", user_id)
if not user:
return ApiResponse(status=404, body={"error": "User not found"})
# Increment login count atomically
updated = await ctx.state.update("users", user_id, [
{"op": "increment", "path": "/login_count", "value": 1},
{"op": "replace", "path": "/last_login", "value": ctx.trace_id},
])
ctx.logger.info("Login recorded", {"user_id": user_id, "count": updated["login_count"]})
return ApiResponse(status=200, body=updated)
login_step.handle(login_handler)
# Step 3: React to user changes
def only_users(input: StateTriggerInput, ctx: FlowContext) -> bool:
return input.group_id == "users"
user_change_step = step({
"name": "OnUserChange",
"triggers": [state(condition=only_users)],
"enqueues": ["user.changed"],
})
async def change_handler(input: StateTriggerInput, ctx: FlowContext) -> None:
old_count = input.old_value.get("login_count", 0) if input.old_value else 0
new_count = input.new_value.get("login_count", 0) if input.new_value else 0
if new_count > old_count:
ctx.logger.info(f"User {input.item_id} logged in (total: {new_count})")
await ctx.enqueue({
"topic": "user.changed",
"data": {
"user_id": input.item_id,
"login_count": new_count,
},
})
user_change_step.handle(change_handler)
# Step 4: List all users
list_users = step({
"name": "ListUsers",
"triggers": [http("GET", "/users")],
})
async def list_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
users = await ctx.state.list("users")
ctx.logger.info(f"Retrieved {len(users)} users")
return ApiResponse(status=200, body={"users": users, "count": len(users)})
list_users.handle(list_handler)