Skip to main content

Workflows

Motia enables you to build complex workflows by orchestrating multiple steps through events, state, and streams. Workflows can execute in parallel, sequentially, or conditionally based on your business logic.

Event-Driven Orchestration

Steps communicate through events using the enqueue function:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  email: z.string(),
  productId: z.string(),
  quantity: z.number(),
})

export const config = {
  name: 'CreateOrder',
  triggers: [{ type: 'http', method: 'POST', path: '/orders', bodySchema: orderSchema }],
  enqueues: ['order.process', 'notification.send'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state, logger }) => {
  const order = {
    id: crypto.randomUUID(),
    ...request.body,
    status: 'created',
    createdAt: new Date().toISOString(),
  }

  await state.set('orders', order.id, order)
  logger.info('Order created', { orderId: order.id })

  // Trigger downstream steps
  await enqueue({
    topic: 'order.process',
    data: { orderId: order.id },
  })

  await enqueue({
    topic: 'notification.send',
    data: {
      email: order.email,
      template: 'order-confirmation',
      data: order,
    },
  })

  return {
    status: 201,
    body: { order },
  }
}

Sequential Workflows

Chain steps together for sequential processing:
// Step 1: Validate Payment
export const validatePaymentConfig = {
  name: 'ValidatePayment',
  triggers: [{ type: 'queue', topic: 'order.process' }],
  enqueues: ['inventory.reserve'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const validatePaymentHandler: Handlers<typeof validatePaymentConfig> = async (
  input,
  { enqueue, state, logger }
) => {
  const { orderId } = input
  const order = await state.get('orders', orderId)

  logger.info('Validating payment', { orderId })

  // Simulate payment validation
  const paymentValid = true // Call payment provider

  if (paymentValid) {
    await state.update('orders', orderId, [
      { type: 'set', path: 'paymentStatus', value: 'validated' },
    ])

    // Continue to next step
    await enqueue({
      topic: 'inventory.reserve',
      data: { orderId },
    })
  } else {
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'payment-failed' },
    ])
  }
}

// Step 2: Reserve Inventory
export const reserveInventoryConfig = {
  name: 'ReserveInventory',
  triggers: [{ type: 'queue', topic: 'inventory.reserve' }],
  enqueues: ['shipping.create'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const reserveInventoryHandler: Handlers<typeof reserveInventoryConfig> = async (
  input,
  { enqueue, state, logger }
) => {
  const { orderId } = input
  const order = await state.get('orders', orderId)

  logger.info('Reserving inventory', { orderId })

  // Reserve inventory
  const reserved = true // Check inventory system

  if (reserved) {
    await state.update('orders', orderId, [
      { type: 'set', path: 'inventoryStatus', value: 'reserved' },
    ])

    // Continue to shipping
    await enqueue({
      topic: 'shipping.create',
      data: { orderId },
    })
  }
}

// Step 3: Create Shipment
export const createShipmentConfig = {
  name: 'CreateShipment',
  triggers: [{ type: 'queue', topic: 'shipping.create' }],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const createShipmentHandler: Handlers<typeof createShipmentConfig> = async (
  input,
  { state, logger }
) => {
  const { orderId } = input

  logger.info('Creating shipment', { orderId })

  await state.update('orders', orderId, [
    { type: 'set', path: 'status', value: 'shipped' },
    { type: 'set', path: 'shippedAt', value: new Date().toISOString() },
  ])

  logger.info('Order workflow completed', { orderId })
}

Parallel Execution

Execute multiple steps in parallel for better performance:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'ParallelDataGathering',
  triggers: [{ type: 'http', method: 'POST', path: '/analyze/:userId' }],
  enqueues: ['fetch.profile', 'fetch.activity', 'fetch.preferences'],
  flows: ['data-aggregation'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state }) => {
  const { userId } = request.params
  const analysisId = crypto.randomUUID()

  // Initialize aggregation state
  await state.set('analysis', analysisId, {
    userId,
    status: 'gathering',
    results: {},
    startedAt: Date.now(),
  })

  // Trigger parallel data fetching
  await Promise.all([
    enqueue({
      topic: 'fetch.profile',
      data: { analysisId, userId },
    }),
    enqueue({
      topic: 'fetch.activity',
      data: { analysisId, userId },
    }),
    enqueue({
      topic: 'fetch.preferences',
      data: { analysisId, userId },
    }),
  ])

  return {
    status: 202,
    body: { analysisId, status: 'gathering' },
  }
}

Parallel Merge Pattern

import type { Handlers, StepConfig } from 'motia'

// Individual data fetcher
export const fetchProfileConfig = {
  name: 'FetchProfile',
  triggers: [{ type: 'queue', topic: 'fetch.profile' }],
  enqueues: ['analysis.aggregate'],
  flows: ['data-aggregation'],
} as const satisfies StepConfig

export const fetchProfileHandler: Handlers<typeof fetchProfileConfig> = async (
  input,
  { enqueue, state }
) => {
  const { analysisId, userId } = input

  // Fetch profile data
  const profile = { name: 'John Doe', age: 30 } // From database

  // Store partial result
  await state.update('analysis', analysisId, [
    { type: 'set', path: 'results.profile', value: profile },
    { type: 'increment', path: 'completedTasks', by: 1 },
  ])

  // Notify aggregator
  await enqueue({
    topic: 'analysis.aggregate',
    data: { analysisId },
  })
}

// Aggregation step (triggered by state)
export const aggregateConfig = {
  name: 'AggregateResults',
  triggers: [
    {
      type: 'state',
      scope: 'analysis',
      condition: (input) => {
        return (
          input.event.type === 'update' &&
          input.event.data.completedTasks === 3
        )
      },
    },
  ],
  flows: ['data-aggregation'],
} as const satisfies StepConfig

export const aggregateHandler: Handlers<typeof aggregateConfig> = async (input, { state, logger }) => {
  const analysis = input.event.data
  const duration = Date.now() - analysis.startedAt

  logger.info('All parallel tasks completed', {
    analysisId: input.id,
    duration,
    results: analysis.results,
  })

  await state.update('analysis', input.id, [
    { type: 'set', path: 'status', value: 'completed' },
    { type: 'set', path: 'completedAt', value: Date.now() },
  ])
}

Conditional Workflows

Route workflows based on conditions:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'ProcessTransaction',
  triggers: [{ type: 'queue', topic: 'transaction.created' }],
  enqueues: ['fraud.check', 'payment.process', 'transaction.reject'],
  flows: ['payment-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
  const { transactionId, amount, userId } = input

  logger.info('Processing transaction', { transactionId, amount })

  // High-value transactions require fraud check
  if (amount > 10000) {
    logger.info('High-value transaction, initiating fraud check', { transactionId })
    
    await state.set('transactions', transactionId, {
      id: transactionId,
      status: 'fraud-check',
      amount,
      userId,
    })

    await enqueue({
      topic: 'fraud.check',
      data: { transactionId, amount, userId },
    })
  } else {
    // Direct to payment processing
    logger.info('Standard transaction, processing payment', { transactionId })
    
    await state.set('transactions', transactionId, {
      id: transactionId,
      status: 'processing',
      amount,
      userId,
    })

    await enqueue({
      topic: 'payment.process',
      data: { transactionId },
    })
  }
}

Multi-Path Routing

export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
  const { orderType, priority, value } = input

  switch (orderType) {
    case 'express':
      await enqueue({ topic: 'express.fulfillment', data: input })
      break

    case 'standard':
      if (priority === 'high' || value > 1000) {
        await enqueue({ topic: 'priority.queue', data: input })
      } else {
        await enqueue({ topic: 'standard.queue', data: input })
      }
      break

    case 'backorder':
      await enqueue({ topic: 'inventory.notify', data: input })
      break

    default:
      logger.warn('Unknown order type', { orderType })
      await enqueue({ topic: 'manual.review', data: input })
  }
}

Multi-Trigger Steps

Create versatile steps that respond to multiple trigger types:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  amount: z.number(),
  description: z.string(),
})

export const config = {
  name: 'ProcessOrder',
  triggers: [
    // API trigger for manual orders
    {
      type: 'http',
      method: 'POST',
      path: '/orders/manual',
      bodySchema: orderSchema,
    },
    // Queue trigger for automated orders
    {
      type: 'queue',
      topic: 'order.created',
      input: orderSchema,
    },
    // Cron trigger for batch processing
    {
      type: 'cron',
      expression: '0 2 * * *', // Daily at 2 AM
    },
  ],
  enqueues: ['order.processed'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, ctx) => {
  const orderId = crypto.randomUUID()

  return ctx.match({
    http: async ({ request }) => {
      const order = {
        id: orderId,
        ...request.body,
        source: 'manual-api',
        createdAt: new Date().toISOString(),
      }

      await ctx.state.set('orders', orderId, order)
      await ctx.enqueue({ topic: 'order.processed', data: { orderId } })

      return {
        status: 200,
        body: { message: 'Order processed', orderId },
      }
    },

    queue: async (queueInput) => {
      const order = {
        id: orderId,
        ...queueInput,
        source: 'event',
        createdAt: new Date().toISOString(),
      }

      await ctx.state.set('orders', orderId, order)
      await ctx.enqueue({ topic: 'order.processed', data: { orderId } })
    },

    cron: async () => {
      ctx.logger.info('Processing batch orders')

      const pendingOrders = await ctx.state.list<{ id: string }>('pending-orders')

      for (const order of pendingOrders) {
        await ctx.enqueue({
          topic: 'order.processed',
          data: { orderId: order.id },
        })
      }

      ctx.logger.info('Batch processing complete', { count: pendingOrders.length })
    },
  })
}

Long-Running Workflows

Handle workflows that span hours or days:
import type { Handlers, StepConfig } from 'motia'

// Start long-running process
export const startConfig = {
  name: 'StartApprovalProcess',
  triggers: [{ type: 'http', method: 'POST', path: '/approvals' }],
  enqueues: ['approval.reminder'],
  flows: ['approval-workflow'],
} as const satisfies StepConfig

export const startHandler: Handlers<typeof startConfig> = async (
  { request },
  { enqueue, state }
) => {
  const approvalId = crypto.randomUUID()

  await state.set('approvals', approvalId, {
    id: approvalId,
    status: 'pending',
    requestedAt: new Date().toISOString(),
    ...request.body,
  })

  // Schedule reminder for 24 hours later
  await enqueue({
    topic: 'approval.reminder',
    data: { approvalId },
    delay: 24 * 60 * 60 * 1000, // 24 hours
  })

  return { status: 202, body: { approvalId, status: 'pending' } }
}

// Reminder step (triggered after delay)
export const reminderConfig = {
  name: 'ApprovalReminder',
  triggers: [{ type: 'queue', topic: 'approval.reminder' }],
  enqueues: ['notification.send'],
  flows: ['approval-workflow'],
} as const satisfies StepConfig

export const reminderHandler: Handlers<typeof reminderConfig> = async (input, { state, enqueue }) => {
  const { approvalId } = input
  const approval = await state.get('approvals', approvalId)

  if (approval?.status === 'pending') {
    await enqueue({
      topic: 'notification.send',
      data: {
        email: approval.approver,
        template: 'approval-reminder',
        data: approval,
      },
    })
  }
}

Error Handling & Retries

Motia automatically retries failed steps (configurable in config.yaml):
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
  try {
    // Attempt external API call
    const result = await fetch('https://api.example.com/process', {
      method: 'POST',
      body: JSON.stringify(input),
    })

    if (!result.ok) {
      throw new Error(`API error: ${result.status}`)
    }

    logger.info('Processing successful')
  } catch (error) {
    logger.error('Processing failed, will retry', { error })
    
    // Throw to trigger automatic retry
    throw error
  }
}

Dead Letter Queue

export const config = {
  name: 'ProcessWithDLQ',
  triggers: [{ type: 'queue', topic: 'data.process' }],
  enqueues: ['data.failed'],
  flows: ['data-processing'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, enqueue, state }) => {
  try {
    // Process data
    await processData(input)
  } catch (error) {
    const retryCount = input.retryCount || 0

    if (retryCount >= 3) {
      // Move to dead letter queue after max retries
      logger.error('Max retries exceeded, sending to DLQ', { error, input })
      
      await state.set('failed-jobs', crypto.randomUUID(), {
        input,
        error: String(error),
        failedAt: new Date().toISOString(),
      })

      await enqueue({
        topic: 'data.failed',
        data: { input, error: String(error) },
      })
    } else {
      // Retry with incremented count
      throw error
    }
  }
}

Workflow Monitoring

Track workflow progress in real-time:
export const handler: Handlers<typeof config> = async ({ request }, { state, streams }) => {
  const workflowId = crypto.randomUUID()

  // Initialize workflow state
  await state.set('workflows', workflowId, {
    id: workflowId,
    status: 'running',
    steps: {
      step1: 'pending',
      step2: 'pending',
      step3: 'pending',
    },
    startedAt: Date.now(),
  })

  // Also set in stream for real-time updates
  await streams.workflows.set('active', workflowId, {
    id: workflowId,
    status: 'running',
    progress: 0,
  })

  return { status: 202, body: { workflowId } }
}

// Update progress as steps complete
export const stepHandler: Handlers<typeof stepConfig> = async (input, { state, streams }) => {
  const { workflowId, stepName } = input

  // Update state
  await state.update('workflows', workflowId, [
    { type: 'set', path: `steps.${stepName}`, value: 'completed' },
  ])

  // Update stream for real-time dashboard
  await streams.workflows.update('active', workflowId, [
    { type: 'increment', path: 'progress', by: 33 },
  ])
}

Best Practices

1. Design for Idempotency

export const handler: Handlers<typeof config> = async (input, { state }) => {
  const { orderId } = input

  // Check if already processed
  const existing = await state.get('processed', orderId)
  if (existing) {
    logger.info('Already processed, skipping', { orderId })
    return
  }

  // Process order
  await processOrder(orderId)

  // Mark as processed
  await state.set('processed', orderId, { processedAt: Date.now() })
}

2. Use Meaningful Topic Names

// Good: Clear, hierarchical
await enqueue({ topic: 'user.created', data })
await enqueue({ topic: 'order.payment.validated', data })
await enqueue({ topic: 'inventory.stock.low', data })

// Avoid: Vague, flat
await enqueue({ topic: 'process', data })
await enqueue({ topic: 'data', data })

3. Document Workflow Flows

export const config = {
  name: 'ProcessOrder',
  description: 'Main order processing step. Validates payment, reserves inventory, creates shipment.',
  flows: ['order-workflow'],
  enqueues: ['payment.validate', 'inventory.reserve'],
  // ...
}

Next Steps

Build docs developers (and LLMs) love