State Management
Motia provides a unified, reactive key-value state store shared across all steps. Every state operation is automatically traced, and you can trigger steps based on state changes.Core Operations
All steps have access to thestate context for managing persistent data:
Get & Set
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'ManageUser',
triggers: [{ type: 'http', method: 'POST', path: '/users' }],
flows: ['user-management'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { state, logger }) => {
const { userId, name, email } = request.body
// Set state with scoped key-value pairs
await state.set('users', userId, {
id: userId,
name,
email,
createdAt: new Date().toISOString(),
})
logger.info('User created', { userId })
// Retrieve state
const user = await state.get('users', userId)
return {
status: 201,
body: { user },
}
}
List & Delete
export const handler: Handlers<typeof config> = async (_, { state }) => {
// List all items in a scope
const allUsers = await state.list<User>('users')
// Delete specific item
await state.delete('users', 'user-123')
// List all scopes/groups
const scopes = await state.listGroups()
return { status: 200, body: { users: allUsers, scopes } }
}
Atomic Operations with Update
Theupdate() method provides atomic operations to prevent race conditions:
import type { Handlers, StepConfig } from 'motia'
import type { Order } from './types'
export const config = {
name: 'ProcessOrder',
triggers: [{ type: 'queue', topic: 'order.created' }],
flows: ['orders'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ topic, data }, { state, logger }) => {
const { orderId, amount } = data
// Atomic increment - no race conditions
await state.update<Order>('orders', orderId, [
{ type: 'increment', path: 'itemCount', by: 1 },
{ type: 'increment', path: 'totalAmount', by: amount },
])
logger.info('Order updated atomically', { orderId })
}
Available Update Operations
import type { UpdateOp } from 'iii-sdk/stream'
// Set a value
{ type: 'set', path: 'status', value: 'completed' }
// Merge objects (deep merge)
{ type: 'merge', path: 'metadata', value: { processed: true, timestamp: Date.now() } }
// Increment numbers
{ type: 'increment', path: 'count', by: 1 }
// Decrement numbers
{ type: 'decrement', path: 'stock', by: 5 }
// Remove a field
{ type: 'remove', path: 'temporaryData' }
State Persistence Patterns
Order Processing with State Audit
import type { Handlers, StepConfig } from 'motia'
import type { Order } from './types'
export const config = {
name: 'ProcessFoodOrder',
triggers: [
{ type: 'queue', topic: 'process-food-order' },
{ type: 'http', method: 'POST', path: '/orders' },
],
enqueues: ['notification'],
flows: ['food-orders'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, ctx) => {
const data = ctx.getData()
const order: Order = {
id: crypto.randomUUID(),
email: data.email,
quantity: data.quantity,
petId: data.petId,
shipDate: new Date().toISOString(),
status: 'placed',
complete: false,
}
// Persist order to state
await ctx.state.set('orders', order.id, order)
// Enqueue notification event
await ctx.enqueue({
topic: 'notification',
data: {
email: data.email,
templateId: 'new-order',
templateData: order,
},
})
return ctx.match({
http: async () => ({
status: 200,
body: { success: true, order },
}),
})
}
Scheduled State Audit
import type { Handlers, StepConfig } from 'motia'
import type { Order } from './types'
export const config = {
name: 'StateAuditJob',
description: 'Check for incomplete orders past ship date',
triggers: [
{
type: 'cron',
expression: '0 0/5 * * * * *', // Every 5 minutes
},
],
enqueues: ['notification'],
flows: ['audit'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, { logger, state, enqueue }) => {
const orders = await state.list<Order>('orders')
for (const order of orders) {
const currentDate = new Date()
const shipDate = new Date(order.shipDate)
if (!order.complete && currentDate > shipDate) {
logger.warn('Order overdue', {
orderId: order.id,
shipDate: order.shipDate,
})
await enqueue({
topic: 'notification',
data: {
email: 'admin@example.com',
templateId: 'order-audit-warning',
templateData: {
orderId: order.id,
status: order.status,
shipDate: order.shipDate,
message: 'Order is not complete and ship date is past',
},
},
})
}
}
}
State Triggers
React to state changes automatically using state triggers:import type { Handlers, StepConfig, StateTriggerInput } from 'motia'
export const config = {
name: 'OnUserCreated',
description: 'Triggered when a new user is created',
triggers: [
{
type: 'state',
scope: 'users',
condition: (input: StateTriggerInput) => {
return input.event.type === 'create'
},
},
],
enqueues: ['user.welcome'],
flows: ['user-lifecycle'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
const user = input.event.data
logger.info('New user detected via state trigger', { userId: user.id })
await enqueue({
topic: 'user.welcome',
data: {
userId: user.id,
email: user.email,
name: user.name,
},
})
}
State Trigger Conditions
import type { StateTriggerInput } from 'motia'
// Trigger on create events
const onCreate = (input: StateTriggerInput) => input.event.type === 'create'
// Trigger on updates
const onUpdate = (input: StateTriggerInput) => input.event.type === 'update'
// Trigger on delete events
const onDelete = (input: StateTriggerInput) => input.event.type === 'delete'
// Trigger on specific field changes
const onStatusChange = (input: StateTriggerInput) => {
return (
input.event.type === 'update' &&
input.event.data.status !== input.previous?.status
)
}
// Trigger on high-value updates
const onHighValueOrder = (input: StateTriggerInput) => {
return (
input.event.type === 'update' &&
input.event.data.totalAmount > 10000
)
}
Parallel Merge Pattern
Coordinate parallel operations using state:import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const bodySchema = z.object({
totalSteps: z.number().default(3),
})
export const config = {
name: 'StartParallelMerge',
triggers: [{ type: 'http', method: 'POST', path: '/parallel-merge', bodySchema }],
enqueues: ['step.process'],
flows: ['parallel-merge'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { state, enqueue }) => {
const { totalSteps } = request.body
const traceId = crypto.randomUUID()
// Initialize merge state
await state.set('merges', traceId, {
totalSteps,
startedAt: Date.now(),
completedSteps: 0,
})
// Enqueue parallel work
await Promise.all(
Array.from({ length: totalSteps }, (_, index) =>
enqueue({
topic: 'step.process',
data: { traceId, stepIndex: index },
})
)
)
return { status: 200, body: { traceId, totalSteps } }
}
Merge Completion Handler
import type { Handlers, StepConfig, StateTriggerInput } from 'motia'
interface MergeState {
totalSteps: number
completedSteps: number
startedAt: number
}
export const config = {
name: 'MergeComplete',
triggers: [
{
type: 'state',
scope: 'merges',
condition: (input: StateTriggerInput<MergeState>) => {
return (
input.event.type === 'update' &&
input.event.data.completedSteps === input.event.data.totalSteps
)
},
},
],
flows: ['parallel-merge'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger }) => {
const result = input.event.data
const duration = Date.now() - result.startedAt
logger.info('Parallel merge completed', {
totalSteps: result.totalSteps,
duration,
})
}
Best Practices
1. Use Scopes for Organization
// Group related data by scope
await state.set('users', userId, userData)
await state.set('orders', orderId, orderData)
await state.set('sessions', sessionId, sessionData)
2. Atomic Updates Over Get-Modify-Set
// Bad: Race condition possible
const order = await state.get('orders', orderId)
order.count += 1
await state.set('orders', orderId, order)
// Good: Atomic operation
await state.update('orders', orderId, [
{ type: 'increment', path: 'count', by: 1 },
])
3. Type Safety with TypeScript
interface User {
id: string
name: string
email: string
createdAt: string
}
// Type-safe state operations
const user = await state.get<User>('users', userId)
const allUsers = await state.list<User>('users')
4. Clear Temporary State
export const handler: Handlers<typeof config> = async (_, { state }) => {
// Clear all items in a scope
await state.clear('temporary-data')
}
Observability
All state operations are automatically traced and appear in the iii Console:- State Operations: Every
get,set,update,deleteis recorded - Trace Correlation: State changes are linked to the originating trace
- State Inspector: View current state in real-time in the Console
- Audit Trail: Track state changes over time
State API Reference
interface StateManager {
// Retrieve a value
get<T>(scope: string, key: string): Promise<T | null>
// Store a value
set<T>(scope: string, key: string, value: T): Promise<StreamSetResult<T> | null>
// Atomic update operations
update<T>(scope: string, key: string, ops: UpdateOp[]): Promise<StreamSetResult<T> | null>
// Delete a value
delete<T>(scope: string, key: string): Promise<T | null>
// List all values in a scope
list<T>(scope: string): Promise<T[]>
// List all scopes
listGroups(): Promise<string[]>
// Clear all items in a scope
clear(scope: string): Promise<void>
}
Next Steps
- Learn about Streaming for real-time updates
- Explore Workflows for multi-step orchestration
- Check out Observability for state monitoring