Skip to main content

State Management

Motia provides a unified, reactive key-value state store shared across all steps. Every state operation is automatically traced, and you can trigger steps based on state changes.

Core Operations

All steps have access to the state context for managing persistent data:

Get & Set

import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'ManageUser',
  triggers: [{ type: 'http', method: 'POST', path: '/users' }],
  flows: ['user-management'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { state, logger }) => {
  const { userId, name, email } = request.body

  // Set state with scoped key-value pairs
  await state.set('users', userId, {
    id: userId,
    name,
    email,
    createdAt: new Date().toISOString(),
  })

  logger.info('User created', { userId })

  // Retrieve state
  const user = await state.get('users', userId)

  return {
    status: 201,
    body: { user },
  }
}

List & Delete

export const handler: Handlers<typeof config> = async (_, { state }) => {
  // List all items in a scope
  const allUsers = await state.list<User>('users')

  // Delete specific item
  await state.delete('users', 'user-123')

  // List all scopes/groups
  const scopes = await state.listGroups()

  return { status: 200, body: { users: allUsers, scopes } }
}

Atomic Operations with Update

The update() method provides atomic operations to prevent race conditions:
import type { Handlers, StepConfig } from 'motia'
import type { Order } from './types'

export const config = {
  name: 'ProcessOrder',
  triggers: [{ type: 'queue', topic: 'order.created' }],
  flows: ['orders'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ topic, data }, { state, logger }) => {
  const { orderId, amount } = data

  // Atomic increment - no race conditions
  await state.update<Order>('orders', orderId, [
    { type: 'increment', path: 'itemCount', by: 1 },
    { type: 'increment', path: 'totalAmount', by: amount },
  ])

  logger.info('Order updated atomically', { orderId })
}

Available Update Operations

import type { UpdateOp } from 'iii-sdk/stream'

// Set a value
{ type: 'set', path: 'status', value: 'completed' }

// Merge objects (deep merge)
{ type: 'merge', path: 'metadata', value: { processed: true, timestamp: Date.now() } }

// Increment numbers
{ type: 'increment', path: 'count', by: 1 }

// Decrement numbers
{ type: 'decrement', path: 'stock', by: 5 }

// Remove a field
{ type: 'remove', path: 'temporaryData' }

State Persistence Patterns

Order Processing with State Audit

import type { Handlers, StepConfig } from 'motia'
import type { Order } from './types'

export const config = {
  name: 'ProcessFoodOrder',
  triggers: [
    { type: 'queue', topic: 'process-food-order' },
    { type: 'http', method: 'POST', path: '/orders' },
  ],
  enqueues: ['notification'],
  flows: ['food-orders'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, ctx) => {
  const data = ctx.getData()

  const order: Order = {
    id: crypto.randomUUID(),
    email: data.email,
    quantity: data.quantity,
    petId: data.petId,
    shipDate: new Date().toISOString(),
    status: 'placed',
    complete: false,
  }

  // Persist order to state
  await ctx.state.set('orders', order.id, order)

  // Enqueue notification event
  await ctx.enqueue({
    topic: 'notification',
    data: {
      email: data.email,
      templateId: 'new-order',
      templateData: order,
    },
  })

  return ctx.match({
    http: async () => ({
      status: 200,
      body: { success: true, order },
    }),
  })
}

Scheduled State Audit

import type { Handlers, StepConfig } from 'motia'
import type { Order } from './types'

export const config = {
  name: 'StateAuditJob',
  description: 'Check for incomplete orders past ship date',
  triggers: [
    {
      type: 'cron',
      expression: '0 0/5 * * * * *', // Every 5 minutes
    },
  ],
  enqueues: ['notification'],
  flows: ['audit'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, { logger, state, enqueue }) => {
  const orders = await state.list<Order>('orders')

  for (const order of orders) {
    const currentDate = new Date()
    const shipDate = new Date(order.shipDate)

    if (!order.complete && currentDate > shipDate) {
      logger.warn('Order overdue', {
        orderId: order.id,
        shipDate: order.shipDate,
      })

      await enqueue({
        topic: 'notification',
        data: {
          email: 'admin@example.com',
          templateId: 'order-audit-warning',
          templateData: {
            orderId: order.id,
            status: order.status,
            shipDate: order.shipDate,
            message: 'Order is not complete and ship date is past',
          },
        },
      })
    }
  }
}

State Triggers

React to state changes automatically using state triggers:
import type { Handlers, StepConfig, StateTriggerInput } from 'motia'

export const config = {
  name: 'OnUserCreated',
  description: 'Triggered when a new user is created',
  triggers: [
    {
      type: 'state',
      scope: 'users',
      condition: (input: StateTriggerInput) => {
        return input.event.type === 'create'
      },
    },
  ],
  enqueues: ['user.welcome'],
  flows: ['user-lifecycle'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
  const user = input.event.data

  logger.info('New user detected via state trigger', { userId: user.id })

  await enqueue({
    topic: 'user.welcome',
    data: {
      userId: user.id,
      email: user.email,
      name: user.name,
    },
  })
}

State Trigger Conditions

import type { StateTriggerInput } from 'motia'

// Trigger on create events
const onCreate = (input: StateTriggerInput) => input.event.type === 'create'

// Trigger on updates
const onUpdate = (input: StateTriggerInput) => input.event.type === 'update'

// Trigger on delete events
const onDelete = (input: StateTriggerInput) => input.event.type === 'delete'

// Trigger on specific field changes
const onStatusChange = (input: StateTriggerInput) => {
  return (
    input.event.type === 'update' &&
    input.event.data.status !== input.previous?.status
  )
}

// Trigger on high-value updates
const onHighValueOrder = (input: StateTriggerInput) => {
  return (
    input.event.type === 'update' &&
    input.event.data.totalAmount > 10000
  )
}

Parallel Merge Pattern

Coordinate parallel operations using state:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const bodySchema = z.object({
  totalSteps: z.number().default(3),
})

export const config = {
  name: 'StartParallelMerge',
  triggers: [{ type: 'http', method: 'POST', path: '/parallel-merge', bodySchema }],
  enqueues: ['step.process'],
  flows: ['parallel-merge'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { state, enqueue }) => {
  const { totalSteps } = request.body
  const traceId = crypto.randomUUID()

  // Initialize merge state
  await state.set('merges', traceId, {
    totalSteps,
    startedAt: Date.now(),
    completedSteps: 0,
  })

  // Enqueue parallel work
  await Promise.all(
    Array.from({ length: totalSteps }, (_, index) =>
      enqueue({
        topic: 'step.process',
        data: { traceId, stepIndex: index },
      })
    )
  )

  return { status: 200, body: { traceId, totalSteps } }
}

Merge Completion Handler

import type { Handlers, StepConfig, StateTriggerInput } from 'motia'

interface MergeState {
  totalSteps: number
  completedSteps: number
  startedAt: number
}

export const config = {
  name: 'MergeComplete',
  triggers: [
    {
      type: 'state',
      scope: 'merges',
      condition: (input: StateTriggerInput<MergeState>) => {
        return (
          input.event.type === 'update' &&
          input.event.data.completedSteps === input.event.data.totalSteps
        )
      },
    },
  ],
  flows: ['parallel-merge'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger }) => {
  const result = input.event.data
  const duration = Date.now() - result.startedAt

  logger.info('Parallel merge completed', {
    totalSteps: result.totalSteps,
    duration,
  })
}

Best Practices

1. Use Scopes for Organization

// Group related data by scope
await state.set('users', userId, userData)
await state.set('orders', orderId, orderData)
await state.set('sessions', sessionId, sessionData)

2. Atomic Updates Over Get-Modify-Set

// Bad: Race condition possible
const order = await state.get('orders', orderId)
order.count += 1
await state.set('orders', orderId, order)

// Good: Atomic operation
await state.update('orders', orderId, [
  { type: 'increment', path: 'count', by: 1 },
])

3. Type Safety with TypeScript

interface User {
  id: string
  name: string
  email: string
  createdAt: string
}

// Type-safe state operations
const user = await state.get<User>('users', userId)
const allUsers = await state.list<User>('users')

4. Clear Temporary State

export const handler: Handlers<typeof config> = async (_, { state }) => {
  // Clear all items in a scope
  await state.clear('temporary-data')
}

Observability

All state operations are automatically traced and appear in the iii Console:
  • State Operations: Every get, set, update, delete is recorded
  • Trace Correlation: State changes are linked to the originating trace
  • State Inspector: View current state in real-time in the Console
  • Audit Trail: Track state changes over time

State API Reference

interface StateManager {
  // Retrieve a value
  get<T>(scope: string, key: string): Promise<T | null>

  // Store a value
  set<T>(scope: string, key: string, value: T): Promise<StreamSetResult<T> | null>

  // Atomic update operations
  update<T>(scope: string, key: string, ops: UpdateOp[]): Promise<StreamSetResult<T> | null>

  // Delete a value
  delete<T>(scope: string, key: string): Promise<T | null>

  // List all values in a scope
  list<T>(scope: string): Promise<T[]>

  // List all scopes
  listGroups(): Promise<string[]>

  // Clear all items in a scope
  clear(scope: string): Promise<void>
}

Next Steps

Build docs developers (and LLMs) love