Overview
The StateManager provides a key-value store for persisting data across step executions. State is organized by groups (scopes) and items (keys), enabling efficient querying and management.
Access the state manager through the context object in step handlers:
async (input, ctx) => {
await ctx.state.set('orders', orderId, orderData)
const order = await ctx.state.get('orders', orderId)
}
Type signature
interface InternalStateManager {
get<T>(groupId: string, key: string): Promise<T | null>
set<T>(groupId: string, key: string, value: T): Promise<StreamSetResult<T> | null>
update<T>(groupId: string, key: string, ops: UpdateOp[]): Promise<StreamSetResult<T> | null>
delete<T>(groupId: string, key: string): Promise<T | null>
list<T>(groupId: string): Promise<T[]>
clear(groupId: string): Promise<void>
}
Methods
get()
Retrieve a value from state.
State group identifier (e.g., ‘orders’, ‘users’)
Item key within the group
The stored value or null if not found
Example
interface Order {
id: string
status: string
amount: number
}
async (input, ctx) => {
const order = await ctx.state.get<Order>('orders', input.orderId)
if (!order) {
return { status: 404, body: { error: 'Order not found' } }
}
return { status: 200, body: order }
}
set()
Store or update a value in state.
Item key within the group
Value to store (must be JSON-serializable)
return
Promise<StreamSetResult<T> | null>
Result containing the stored value and metadata, or null on failure
type StreamSetResult<T> = {
value: T
version: number
timestamp: number
}
Example
async (input, ctx) => {
const order = {
id: input.orderId,
status: 'pending',
amount: input.amount,
createdAt: Date.now(),
}
const result = await ctx.state.set('orders', order.id, order)
ctx.logger.info('Order created', {
orderId: order.id,
version: result?.version,
})
return { status: 201, body: order }
}
update()
Update a value using JSON Patch operations.
Item key within the group
Array of JSON Patch operations
return
Promise<StreamSetResult<T> | null>
Result containing the updated value and metadata, or null on failure
type UpdateOp =
| { op: 'add', path: string, value: any }
| { op: 'remove', path: string }
| { op: 'replace', path: string, value: any }
| { op: 'move', from: string, path: string }
| { op: 'copy', from: string, path: string }
| { op: 'test', path: string, value: any }
Example
async (input, ctx) => {
// Update order status and add tracking number
const result = await ctx.state.update('orders', input.orderId, [
{ op: 'replace', path: '/status', value: 'shipped' },
{ op: 'add', path: '/trackingNumber', value: input.trackingNumber },
{ op: 'replace', path: '/shippedAt', value: Date.now() },
])
if (!result) {
return { status: 404, body: { error: 'Order not found' } }
}
ctx.logger.info('Order updated', {
orderId: input.orderId,
version: result.version,
})
return { status: 200, body: result.value }
}
delete()
Remove a value from state.
Item key within the group
The deleted value or null if not found
Example
async (input, ctx) => {
const deletedOrder = await ctx.state.delete('orders', input.orderId)
if (!deletedOrder) {
return { status: 404, body: { error: 'Order not found' } }
}
ctx.logger.info('Order deleted', { orderId: input.orderId })
return { status: 200, body: { deleted: true, order: deletedOrder } }
}
list()
Retrieve all items in a group.
Array of all items in the group
Example
async (input, ctx) => {
const allOrders = await ctx.state.list<Order>('orders')
const pendingOrders = allOrders.filter(order => order.status === 'pending')
ctx.logger.info('Orders retrieved', {
total: allOrders.length,
pending: pendingOrders.length,
})
return {
status: 200,
body: { orders: allOrders, count: allOrders.length },
}
}
clear()
Delete all items in a group.
Resolves when all items are deleted
Example
async (input, ctx) => {
ctx.logger.info('Clearing all orders')
await ctx.state.clear('orders')
ctx.logger.info('All orders cleared')
return { status: 200, body: { message: 'All orders cleared' } }
}
Usage patterns
CRUD operations
Implement complete CRUD functionality:
import { step, http } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
amount: z.number(),
items: z.array(z.string()),
})
export const { config, handler } = step(
{
name: 'OrderAPI',
triggers: [
http('POST', '/orders', { bodySchema: orderSchema }),
http('GET', '/orders/:orderId'),
http('PUT', '/orders/:orderId', { bodySchema: orderSchema }),
http('DELETE', '/orders/:orderId'),
],
},
async (input, ctx) => {
const { method, pathParams, body } = input.request
const orderId = pathParams.orderId || generateId()
if (method === 'POST') {
const order = { id: orderId, ...body, status: 'pending' }
await ctx.state.set('orders', orderId, order)
return { status: 201, body: order }
}
if (method === 'GET') {
const order = await ctx.state.get('orders', orderId)
if (!order) return { status: 404, body: { error: 'Not found' } }
return { status: 200, body: order }
}
if (method === 'PUT') {
const order = { id: orderId, ...body }
await ctx.state.set('orders', orderId, order)
return { status: 200, body: order }
}
if (method === 'DELETE') {
const deleted = await ctx.state.delete('orders', orderId)
if (!deleted) return { status: 404, body: { error: 'Not found' } }
return { status: 200, body: { deleted: true } }
}
}
)
State-triggered workflows
React to state changes:
import { step, state, queue } from 'motia'
import type { StateTriggerInput } from 'motia'
interface Order {
id: string
status: string
email: string
}
// Listen for status changes
export const { config, handler } = step(
{
name: 'OnOrderStatusChange',
triggers: [
state((input: StateTriggerInput<Order>) => {
return input.group_id === 'orders' &&
input.old_value?.status !== input.new_value?.status
}),
],
enqueues: ['notification'],
},
async (input, ctx) => {
const { item_id, old_value, new_value } = input
ctx.logger.info('Order status changed', {
orderId: item_id,
from: old_value?.status,
to: new_value?.status,
})
await ctx.enqueue({
topic: 'notification',
data: {
email: new_value.email,
templateId: 'order-status-changed',
orderId: item_id,
status: new_value.status,
},
})
}
)
Batch processing
Process all items in a group:
import { step, cron } from 'motia'
interface Order {
id: string
status: string
shipDate: string
complete: boolean
}
export const { config, handler } = step(
{
name: 'AuditOrders',
triggers: [cron('0 0 * * *')], // Daily at midnight
enqueues: ['notification'],
},
async (_input, ctx) => {
const orders = await ctx.state.list<Order>('orders')
const now = new Date()
let expiredCount = 0
for (const order of orders) {
const shipDate = new Date(order.shipDate)
if (!order.complete && now > shipDate) {
ctx.logger.warn('Order expired', {
orderId: order.id,
shipDate: order.shipDate,
})
await ctx.enqueue({
topic: 'notification',
data: {
templateId: 'order-expired',
orderId: order.id,
},
})
expiredCount++
}
}
ctx.logger.info('Audit complete', {
total: orders.length,
expired: expiredCount,
})
}
)
Atomic updates
Use JSON Patch for safe concurrent updates:
async (input, ctx) => {
// Increment a counter atomically
const result = await ctx.state.update('counters', 'page-views', [
{ op: 'replace', path: '/count', value: (await getCount()) + 1 },
{ op: 'replace', path: '/lastUpdated', value: Date.now() },
])
return { status: 200, body: result?.value }
}
Best practices
-
Use typed access: Always specify the type parameter for type-safe operations:
const order = await ctx.state.get<Order>('orders', orderId)
-
Handle null returns: Check for
null when getting or deleting:
const order = await ctx.state.get('orders', orderId)
if (!order) {
return { status: 404, body: { error: 'Not found' } }
}
-
Use meaningful group names: Organize state by entity type:
await ctx.state.set('orders', orderId, order)
await ctx.state.set('users', userId, user)
await ctx.state.set('sessions', sessionId, session)
-
Prefer update() for modifications: Use JSON Patch operations instead of get + set:
// Good: atomic update
await ctx.state.update('orders', orderId, [
{ op: 'replace', path: '/status', value: 'shipped' },
])
// Avoid: race condition possible
const order = await ctx.state.get('orders', orderId)
order.status = 'shipped'
await ctx.state.set('orders', orderId, order)
-
Log state operations: Include state operations in logs for debugging:
ctx.logger.info('Updating order status', { orderId })
await ctx.state.update('orders', orderId, ops)
ctx.logger.info('Order status updated')