Skip to main content
Triggers define how and when a step executes. Motia supports five trigger types: HTTP, queue, cron, state, and stream. Each step can have multiple triggers of different types.

HTTP triggers

HTTP triggers expose your step as a REST API endpoint.
import { step, http } from 'motia'
import { z } from 'zod'

const createUserSchema = z.object({
  email: z.string().email(),
  name: z.string()
})

export default step({
  name: 'create-user',
  triggers: [
    http('POST', '/users', {
      bodySchema: createUserSchema,
      responseSchema: {
        200: z.object({ id: z.string(), email: z.string() }),
        400: z.object({ error: z.string() })
      },
      queryParams: [{ name: 'source', description: 'User source' }],
      middleware: [authMiddleware, rateLimitMiddleware]
    })
  ]
}, async (input, ctx) => {
  const { email, name } = input.request.body
  const source = input.request.queryParams.source
  
  const user = await createUser(email, name)
  
  return {
    status: 200,
    headers: { 'Content-Type': 'application/json' },
    body: { id: user.id, email: user.email }
  }
})

HTTP methods

Supported methods: GET, POST, PUT, DELETE, PATCH, OPTIONS, HEAD

Path parameters

Use :param syntax for dynamic path segments:
http('GET', '/users/:userId/posts/:postId')
Access via input.request.pathParams:
const { userId, postId } = input.request.pathParams

Request object

interface MotiaHttpArgs<TBody> {
  request: {
    pathParams: Record<string, string>
    queryParams: Record<string, string | string[]>
    body: TBody
    headers: Record<string, string | string[]>
    method: string
    requestBody: ChannelReader // For streaming
  }
  response: {
    status: (code: number) => void
    headers: (headers: Record<string, string>) => void
    stream: WritableStream
    close: () => void
  }
}

Middleware

Middleware functions run before the handler:
import type { ApiMiddleware } from 'motia'

const authMiddleware: ApiMiddleware = async (req, ctx, next) => {
  const token = req.request.headers.authorization
  
  if (!token) {
    return { status: 401, body: { error: 'Unauthorized' } }
  }
  
  // Continue to next middleware or handler
  return next()
}

Queue triggers

Queue triggers process messages from topics, enabling asynchronous workflows.
import { step, queue } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  orderId: z.string(),
  amount: z.number(),
  items: z.array(z.string())
})

export default step({
  name: 'process-order',
  triggers: [
    queue('orders', {
      input: orderSchema,
      infrastructure: {
        queue: {
          type: 'fifo',
          maxRetries: 5,
          visibilityTimeout: 60,
          delaySeconds: 0
        }
      }
    })
  ],
  enqueues: ['order-confirmation', 'inventory-update']
}, async (orderData, ctx) => {
  // orderData is typed as z.infer<typeof orderSchema>
  await processOrder(orderData)
  
  await ctx.enqueue({
    topic: 'order-confirmation',
    data: { orderId: orderData.orderId, status: 'processed' }
  })
})

Queue configuration

type
'fifo' | 'standard'
  • fifo: First-in-first-out order, exactly-once delivery
  • standard: Best-effort order, at-least-once delivery (default)
maxRetries
number
Number of retry attempts before moving to dead letter queue (default: 3)
visibilityTimeout
number
Seconds a message is hidden after being received (default: 30)
delaySeconds
number
Delay before message becomes available (default: 0)

Message group IDs

For FIFO queues, use messageGroupId to ensure ordering:
await ctx.enqueue({
  topic: 'orders',
  data: orderData,
  messageGroupId: `customer-${customerId}`
})

Cron triggers

Cron triggers run on a schedule using cron expressions.
import { step, cron } from 'motia'

export default step({
  name: 'daily-report',
  triggers: [
    cron('0 9 * * *') // Every day at 9 AM
  ],
  enqueues: ['report-generated']
}, async (_, ctx) => {
  ctx.logger.info('Generating daily report')
  
  const report = await generateReport()
  
  await ctx.enqueue({
    topic: 'report-generated',
    data: report
  })
})

Cron expression format

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *
Common examples:
  • 0 * * * * - Every hour
  • */15 * * * * - Every 15 minutes
  • 0 0 * * 0 - Every Sunday at midnight
  • 0 9-17 * * 1-5 - Every hour from 9 AM to 5 PM, Monday to Friday
Cron triggers do not receive input data. The first parameter is undefined.

State triggers

State triggers fire when state values change, enabling reactive workflows.
import { step, state } from 'motia'

export default step({
  name: 'on-user-status-change',
  triggers: [
    state((input, ctx) => {
      // Condition: only trigger for user status changes
      return input.group_id.startsWith('user:') && 
             input.old_value?.status !== input.new_value?.status
    })
  ]
}, async (input, ctx) => {
  const { group_id, item_id, old_value, new_value } = input
  
  ctx.logger.info(`User ${item_id} status changed`, {
    from: old_value?.status,
    to: new_value?.status
  })
  
  if (new_value?.status === 'active') {
    await sendWelcomeEmail(item_id)
  }
})

State trigger input

interface StateTriggerInput<T> {
  type: 'state'
  group_id: string
  item_id: string
  old_value?: T
  new_value?: T
}
Use state triggers to build reactive systems that respond to data changes automatically.

Stream triggers

Stream triggers fire on create, update, or delete events in a stream.
import { step, stream } from 'motia'

export default step({
  name: 'on-document-change',
  triggers: [
    stream('documents', {
      groupId: 'workspace-123',
      condition: (input, ctx) => {
        // Only trigger for PDF documents
        return input.event.data.type === 'pdf'
      }
    })
  ]
}, async (input, ctx) => {
  const { event, groupId, id } = input
  
  switch (event.type) {
    case 'create':
      await indexDocument(event.data)
      break
    case 'update':
      await reindexDocument(event.data)
      break
    case 'delete':
      await removeFromIndex(id)
      break
  }
})

Stream trigger input

interface StreamTriggerInput<T> {
  type: 'stream'
  timestamp: number
  streamName: string
  groupId: string
  id: string
  event: StreamEvent<T>
}

type StreamEvent<T> =
  | { type: 'create'; data: T }
  | { type: 'update'; data: T }
  | { type: 'delete'; data: T }

Filtering streams

You can filter by group and/or item:
// Listen to all events in a specific group
stream('documents', { groupId: 'workspace-123' })

// Listen to a specific item
stream('documents', { groupId: 'workspace-123', itemId: 'doc-456' })

// Listen to all events (no filter)
stream('documents')

Conditional triggers

All triggers support conditional execution:
import { step, queue } from 'motia'

export default step({
  name: 'process-priority-orders',
  triggers: [
    queue('orders', undefined, (data, ctx) => {
      // Only process orders over $1000
      return data.amount > 1000
    })
  ]
}, async (data, ctx) => {
  await processPriorityOrder(data)
})
Condition functions run for every trigger event. Keep them fast and side-effect free.

Next steps

Context API

Learn about FlowContext features

Handlers

Handler patterns and best practices

State management

Working with state in handlers

Streams

Real-time data with streams

Build docs developers (and LLMs) love