Skip to main content
The StateManager provides persistent key-value storage for workflow state. Each value is stored within a scope (group) and identified by a key. State is automatically persisted and available across step executions.

Accessing state

Access the state manager through the flow context:
from motia import FlowContext

async def handler(input, ctx: FlowContext) -> None:
    # Access via context
    await ctx.state.get("users", "user-123")
Or import the global instance:
from motia import stateManager

# Use global instance (requires III initialization)
await stateManager.get("users", "user-123")

Methods

get()

Retrieve a value from state.
async def get(self, scope: str, key: str) -> Any | None
scope
string
required
The state scope/group to read from.
key
string
required
The key within the scope to retrieve.
return
Any | None
The stored value, or None if the key doesn’t exist.

Example

async def handler(input, ctx: FlowContext) -> None:
    user = await ctx.state.get("users", "user-123")
    
    if user:
        ctx.logger.info(f"Found user: {user['name']}")
    else:
        ctx.logger.info("User not found")

set()

Store a value in state.
async def set(self, scope: str, key: str, value: Any) -> Any
scope
string
required
The state scope/group to write to.
key
string
required
The key within the scope to set.
value
Any
required
The value to store. Must be JSON-serializable.
return
Any
The stored value (echoed back).

Example

async def handler(input, ctx: FlowContext) -> None:
    await ctx.state.set("users", "user-123", {
        "id": "user-123",
        "name": "Alice",
        "email": "alice@example.com",
        "created_at": "2024-01-15T10:30:00Z",
    })
    
    ctx.logger.info("User created")

update()

Update a value using atomic update operations.
async def update(self, scope: str, key: str, ops: list[dict[str, Any]]) -> Any
scope
string
required
The state scope/group containing the value.
key
string
required
The key within the scope to update.
ops
list[dict]
required
List of update operations to apply. Each operation is a dict with op, path, and optionally value.
return
Any
The updated value.

Update operations

Supported operations follow JSON Patch (RFC 6902) semantics:
  • {"op": "add", "path": "/field", "value": ...} - Add or replace a field
  • {"op": "remove", "path": "/field"} - Remove a field
  • {"op": "replace", "path": "/field", "value": ...} - Replace a field value
  • {"op": "increment", "path": "/count", "value": 1} - Increment a numeric field

Example

async def handler(input, ctx: FlowContext) -> None:
    # Increment a counter
    await ctx.state.update("users", "user-123", [
        {"op": "increment", "path": "/login_count", "value": 1},
        {"op": "replace", "path": "/last_login", "value": "2024-01-15T14:30:00Z"},
    ])
    
    # Add a new field
    await ctx.state.update("users", "user-123", [
        {"op": "add", "path": "/verified", "value": True},
    ])
    
    # Remove a field
    await ctx.state.update("users", "user-123", [
        {"op": "remove", "path": "/temp_token"},
    ])

delete()

Remove a value from state.
async def delete(self, scope: str, key: str) -> Any | None
scope
string
required
The state scope/group to delete from.
key
string
required
The key within the scope to delete.
return
Any | None
The deleted value, or None if the key didn’t exist.

Example

async def handler(input, ctx: FlowContext) -> None:
    deleted_user = await ctx.state.delete("users", "user-123")
    
    if deleted_user:
        ctx.logger.info(f"Deleted user: {deleted_user['name']}")
    else:
        ctx.logger.info("User not found")

list()

List all values within a scope.
async def list(self, scope: str) -> list[Any]
scope
string
required
The state scope/group to list.
return
list[Any]
List of all values in the scope. Each value typically includes an id field.

Example

async def handler(input, ctx: FlowContext) -> None:
    all_users = await ctx.state.list("users")
    
    ctx.logger.info(f"Found {len(all_users)} users")
    
    for user in all_users:
        ctx.logger.info(f"User: {user['id']} - {user.get('name')}")

list_groups()

List all scope IDs that contain state.
async def list_groups(self) -> list[str]
return
list[str]
List of all scope names that have been created.

Example

async def handler(input, ctx: FlowContext) -> None:
    scopes = await ctx.state.list_groups()
    
    ctx.logger.info(f"State scopes: {scopes}")
    # Example output: ["users", "orders", "sessions"]
    
    # List all items across all scopes
    for scope in scopes:
        items = await ctx.state.list(scope)
        ctx.logger.info(f"Scope '{scope}': {len(items)} items")

clear()

Delete all values within a scope.
async def clear(self, scope: str) -> None
scope
string
required
The state scope/group to clear.
return
None
No return value.

Example

async def handler(input, ctx: FlowContext) -> None:
    # Clear all temporary session data
    await ctx.state.clear("sessions")
    
    ctx.logger.info("All sessions cleared")

State triggers

Steps can be triggered when state changes occur. Use the state() trigger factory:
from motia import state, step, FlowContext, StateTriggerInput

def only_user_scope(input: StateTriggerInput, ctx: FlowContext) -> bool:
    return input.group_id == "users"

my_step = step({
    "name": "OnUserChange",
    "triggers": [
        state(condition=only_user_scope),
    ],
})

async def handler(input: StateTriggerInput, ctx: FlowContext) -> None:
    ctx.logger.info(
        f"User {input.item_id} changed",
        {
            "old_value": input.old_value,
            "new_value": input.new_value,
        },
    )
See Triggers for more information.

Complete example

from motia import step, http, state, FlowContext, ApiRequest, ApiResponse, StateTriggerInput

# Step 1: Create user via API
user_api = step({
    "name": "CreateUser",
    "triggers": [http("POST", "/users")],
})

async def create_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    body = request.body or {}
    
    user_id = f"user-{ctx.trace_id[:8]}"
    
    # Create user in state
    user = {
        "id": user_id,
        "name": body.get("name"),
        "email": body.get("email"),
        "login_count": 0,
        "created_at": ctx.trace_id,
    }
    
    await ctx.state.set("users", user_id, user)
    
    ctx.logger.info("User created", {"user_id": user_id})
    return ApiResponse(status=201, body=user)

user_api.handle(create_handler)

# Step 2: Update user login count
login_step = step({
    "name": "RecordLogin",
    "triggers": [http("POST", "/users/:id/login")],
})

async def login_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    user_id = request.path_params.get("id")
    
    if not user_id:
        return ApiResponse(status=400, body={"error": "User ID required"})
    
    # Check if user exists
    user = await ctx.state.get("users", user_id)
    if not user:
        return ApiResponse(status=404, body={"error": "User not found"})
    
    # Increment login count atomically
    updated = await ctx.state.update("users", user_id, [
        {"op": "increment", "path": "/login_count", "value": 1},
        {"op": "replace", "path": "/last_login", "value": ctx.trace_id},
    ])
    
    ctx.logger.info("Login recorded", {"user_id": user_id, "count": updated["login_count"]})
    return ApiResponse(status=200, body=updated)

login_step.handle(login_handler)

# Step 3: React to user changes
def only_users(input: StateTriggerInput, ctx: FlowContext) -> bool:
    return input.group_id == "users"

user_change_step = step({
    "name": "OnUserChange",
    "triggers": [state(condition=only_users)],
    "enqueues": ["user.changed"],
})

async def change_handler(input: StateTriggerInput, ctx: FlowContext) -> None:
    old_count = input.old_value.get("login_count", 0) if input.old_value else 0
    new_count = input.new_value.get("login_count", 0) if input.new_value else 0
    
    if new_count > old_count:
        ctx.logger.info(f"User {input.item_id} logged in (total: {new_count})")
        
        await ctx.enqueue({
            "topic": "user.changed",
            "data": {
                "user_id": input.item_id,
                "login_count": new_count,
            },
        })

user_change_step.handle(change_handler)

# Step 4: List all users
list_users = step({
    "name": "ListUsers",
    "triggers": [http("GET", "/users")],
})

async def list_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    users = await ctx.state.list("users")
    
    ctx.logger.info(f"Retrieved {len(users)} users")
    return ApiResponse(status=200, body={"users": users, "count": len(users)})

list_users.handle(list_handler)

Build docs developers (and LLMs) love