Skip to main content
Workflows in Motia are built by connecting Steps together through queue events. Each Step processes data and enqueues events for the next Step, creating powerful event-driven architectures.

Simple workflow pattern

The most basic workflow connects an API endpoint to a background worker:
// Step 1: API receives request
export const config = {
  name: 'SendMessage',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/messages',
    }
  ],
  enqueues: ['message.sent']
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (req, { enqueue }) => {
  await enqueue({
    topic: 'message.sent',
    data: { text: req.body.text }
  })
  return { status: 200, body: { ok: true } }
}
// Step 2: Worker processes message
export const config = {
  name: 'ProcessMessage',
  triggers: [
    {
      type: 'queue',
      topic: 'message.sent',
    }
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger }) => {
  logger.info('Processing message', input)
}

Multi-step workflow

Build complex workflows by chaining multiple Steps:
1

Step 1: Create the order

Start with an API endpoint that creates an order:
// steps/create-order.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'CreateOrder',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/orders',
      bodySchema: z.object({
        pet: z.object({ name: z.string(), photoUrl: z.string() }),
        foodOrder: z.object({ quantity: z.number() }),
      }),
    },
  ],
  enqueues: ['order.created'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (request, { enqueue, logger, traceId }) => {
  const { pet, foodOrder } = request.body
  
  logger.info('Creating order', { pet, foodOrder, traceId })
  
  const petRecord = await createPet(pet)
  
  await enqueue({
    topic: 'order.created',
    data: {
      petId: petRecord.id,
      quantity: foodOrder.quantity,
      email: '[email protected]',
    },
  })
  
  return { status: 200, body: petRecord }
}
2

Step 2: Process the order

Process the order and create a shipping record:
// steps/process-order.step.ts
import { queue } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  petId: z.string(),
  quantity: z.number(),
  email: z.string(),
})

export const config = {
  name: 'ProcessOrder',
  triggers: [queue('order.created', { input: orderSchema })],
  enqueues: ['order.processed'],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue, traceId }) => {
  logger.info('Processing order', { input, traceId })
  
  const order = await createOrder({
    ...input,
    shipDate: new Date().toISOString(),
    status: 'placed',
  })
  
  await state.set('orders', order.id, order)
  
  await enqueue({
    topic: 'order.processed',
    data: {
      orderId: order.id,
      email: input.email,
      status: order.status,
    },
  })
}
3

Step 3: Send notification

Notify the customer that their order was processed:
// steps/send-notification.step.ts
import { queue, jsonSchema } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'SendNotification',
  triggers: [
    queue('order.processed', {
      input: jsonSchema(
        z.object({
          orderId: z.string(),
          email: z.string(),
          status: z.string(),
        }),
      ),
    }),
  ],
  enqueues: [],
  flows: ['order-workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, traceId }) => {
  logger.info('Sending notification', {
    orderId: input.orderId,
    email: input.email.replace(/(?<=.{2}).(?=.*@)/g, '*'),
    traceId,
  })
  
  // Send email notification
  await emailService.send({
    to: input.email,
    template: 'order-confirmation',
    data: {
      orderId: input.orderId,
      status: input.status,
    },
  })
}

Conditional workflows

Use trigger conditions to create branching logic:
import type { TriggerCondition } from 'motia'

const isHighValue: TriggerCondition<{ amount: number }> = (input) => {
  return input.amount > 1000
}

const isVerifiedUser: TriggerCondition<ApiRequest<{ user: { verified: boolean } }>> = (input, ctx) => {
  if (ctx.trigger.type !== 'http') return false
  return input.body.user.verified === true
}

export const config = {
  name: 'ProcessOrder',
  triggers: [
    {
      type: 'queue',
      topic: 'order.created',
      input: z.object({ amount: z.number() }),
      condition: isHighValue, // Only process high-value orders
    },
    {
      type: 'http',
      method: 'POST',
      path: '/orders/manual',
      condition: isVerifiedUser, // Only allow verified users
    },
  ],
  enqueues: ['order.processed'],
} as const satisfies StepConfig

Multi-trigger workflows

Handle different trigger types with pattern matching:
export const config = {
  name: 'MultiTriggerOrder',
  triggers: [
    queue('order.created', { input: orderSchema }),
    http('POST', '/orders/manual', { bodySchema: orderSchema }),
    {
      type: 'cron',
      expression: '0 */6 * * *', // Every 6 hours
    },
  ],
  enqueues: ['order.processed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, ctx) => {
  return ctx.match({
    http: async ({ request }) => {
      ctx.logger.info('Processing manual order', { body: request.body })
      
      const order = await processOrder(request.body)
      
      await ctx.enqueue({
        topic: 'order.processed',
        data: { orderId: order.id, source: 'manual' },
      })
      
      return { status: 200, body: { order } }
    },
    
    queue: async (input) => {
      ctx.logger.info('Processing queued order', { input })
      
      const order = await processOrder(input)
      
      await ctx.enqueue({
        topic: 'order.processed',
        data: { orderId: order.id, source: 'queue' },
      })
    },
    
    cron: async () => {
      ctx.logger.info('Processing batch orders')
      
      const pendingOrders = await ctx.state.list('pending-orders')
      
      for (const order of pendingOrders) {
        await ctx.enqueue({
          topic: 'order.created',
          data: order,
        })
      }
    },
  })
}

Parallel workflows

Enqueue multiple events to process work in parallel:
export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
  logger.info('Starting parallel workflow')
  
  // Enqueue multiple tasks to run in parallel
  await Promise.all([
    enqueue({ topic: 'process-payment', data: { orderId: input.orderId } }),
    enqueue({ topic: 'check-inventory', data: { items: input.items } }),
    enqueue({ topic: 'send-confirmation', data: { email: input.email } }),
  ])
  
  logger.info('All tasks enqueued')
}

State-driven workflows

Use state triggers to react to state changes:
export const config = {
  name: 'OnOrderStateChange',
  triggers: [
    {
      type: 'state',
      namespace: 'orders',
      event: 'set',
    },
  ],
  enqueues: ['order.status.changed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
  const { key, new_value, old_value } = input
  
  if (old_value?.status !== new_value?.status) {
    logger.info('Order status changed', {
      orderId: key,
      oldStatus: old_value?.status,
      newStatus: new_value?.status,
    })
    
    await enqueue({
      topic: 'order.status.changed',
      data: {
        orderId: key,
        status: new_value.status,
      },
    })
  }
}

Workflow observability

Track workflows across Steps using traceId:
export const handler: Handlers<typeof config> = async (input, { logger, traceId, enqueue }) => {
  logger.info('Step started', { traceId, input })
  
  // The traceId is automatically propagated to enqueued events
  await enqueue({
    topic: 'next-step',
    data: input,
  })
  
  logger.info('Step completed', { traceId })
}
View the complete workflow trace in the iii Console dashboard.

Error handling in workflows

Handle errors at each Step:
export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue }) => {
  try {
    const result = await processOrder(input)
    
    await state.set('orders', result.id, {
      ...result,
      status: 'completed',
    })
    
    await enqueue({
      topic: 'order.completed',
      data: result,
    })
    
  } catch (error) {
    logger.error('Order processing failed', { error, input })
    
    await state.set('orders', input.orderId, {
      status: 'failed',
      error: error.message,
    })
    
    await enqueue({
      topic: 'order.failed',
      data: { orderId: input.orderId, error: error.message },
    })
  }
}

Queue triggers

Learn about queue trigger configuration

Background jobs

Process work asynchronously

State triggers

React to state changes

Context API

Access workflow context

Build docs developers (and LLMs) love