Skip to main content

Overview

Trigger factory functions create typed trigger configurations for steps. Each function returns a trigger object that defines when and how a step should execute.

http()

Create an HTTP API trigger for REST endpoints.

Signature

function http<TOptions extends ApiOptions>(  
  method: ApiRouteMethod,
  path: string,
  options?: TOptions,
  condition?: TriggerCondition
): ApiTrigger

Parameters

method
ApiRouteMethod
required
HTTP method: 'GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', or 'HEAD'
path
string
required
URL path pattern. Supports path parameters with :paramName syntax (e.g., /users/:id)
options
ApiOptions
condition
TriggerCondition
Optional function to conditionally execute the trigger

Example

import { step, http } from 'motia'
import { z } from 'zod'

const petSchema = z.object({
  name: z.string(),
  photoUrl: z.string().url(),
})

export const { config, handler } = step(
  {
    name: 'CreatePet',
    triggers: [
      http('POST', '/pets', {
        bodySchema: petSchema,
        responseSchema: {
          200: z.object({ id: z.string(), name: z.string() }),
          400: z.object({ error: z.string() }),
        },
      }),
    ],
  },
  async (request, ctx) => {
    const { name, photoUrl } = request.body
    const pet = await createPet({ name, photoUrl })
    
    return {
      status: 200,
      body: { id: pet.id, name: pet.name },
    }
  }
)

Path parameters

http('GET', '/users/:userId/posts/:postId', {
  responseSchema: {
    200: z.object({ userId: z.string(), postId: z.string() }),
  },
})

// In handler:
async (request, ctx) => {
  const { userId, postId } = request.pathParams
}

queue()

Create a queue trigger for asynchronous event processing.

Signature

function queue<TOptions extends QueueOptions>(
  topic: string,
  options?: TOptions,
  condition?: TriggerCondition
): QueueTrigger

Parameters

topic
string
required
Queue topic name to subscribe to
options
QueueOptions
condition
TriggerCondition
Optional function to conditionally process messages

Example

import { step, queue } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  email: z.string().email(),
  quantity: z.number().positive(),
  petId: z.string(),
})

export const { config, handler } = step(
  {
    name: 'ProcessOrder',
    triggers: [
      queue('process-food-order', {
        input: orderSchema,
        infrastructure: {
          handler: {
            ram: 512,
            timeout: 30,
          },
          queue: {
            type: 'fifo',
            maxRetries: 3,
            visibilityTimeout: 60,
          },
        },
      }),
    ],
    enqueues: ['notification'],
  },
  async (input, ctx) => {
    ctx.logger.info('Processing order', { input })
    
    const order = await processOrder(input)
    await ctx.state.set('orders', order.id, order)
    
    await ctx.enqueue({
      topic: 'notification',
      data: { email: input.email, orderId: order.id },
    })
  }
)

cron()

Create a scheduled trigger using cron expressions.

Signature

function cron(
  expression: string,
  condition?: TriggerCondition
): CronTrigger

Parameters

expression
string
required
Cron expression defining the schedule. Supports standard cron syntax with optional seconds field
condition
TriggerCondition
Optional function to conditionally execute scheduled tasks

Example

import { step, cron } from 'motia'

export const { config, handler } = step(
  {
    name: 'DailyCleanup',
    description: 'Clean up old orders daily at midnight',
    triggers: [
      cron('0 0 * * *'), // Every day at 00:00
    ],
  },
  async (_input, ctx) => {
    ctx.logger.info('Running daily cleanup')
    
    const orders = await ctx.state.list('orders')
    const oldOrders = orders.filter(isExpired)
    
    for (const order of oldOrders) {
      await ctx.state.delete('orders', order.id)
    }
    
    ctx.logger.info(`Cleaned up ${oldOrders.length} old orders`)
  }
)

Cron expression format

┌────────────── second (0-59, optional)
│ ┌──────────── minute (0-59)
│ │ ┌────────── hour (0-23)
│ │ │ ┌──────── day of month (1-31)
│ │ │ │ ┌────── month (1-12)
│ │ │ │ │ ┌──── day of week (0-6, Sunday=0)
│ │ │ │ │ │
* * * * * *
Common patterns:
  • 0 0 * * * - Daily at midnight
  • 0 */6 * * * - Every 6 hours
  • 0 0 * * 0 - Weekly on Sunday at midnight
  • 0 0 1 * * - Monthly on the 1st at midnight
  • */30 * * * * * - Every 30 seconds

state()

Create a trigger that fires on state changes.

Signature

function state(
  condition?: TriggerCondition
): StateTrigger

Parameters

condition
TriggerCondition
Function to filter which state changes trigger execution. Receives StateTriggerInput with group_id, item_id, old_value, and new_value

Example

import { step, state } from 'motia'
import type { StateTriggerInput } from 'motia'

interface Order {
  id: string
  status: string
  email: string
}

export const { config, handler } = step(
  {
    name: 'OnOrderShipped',
    triggers: [
      state((input: StateTriggerInput<Order>) => {
        return (
          input.group_id === 'orders' &&
          input.old_value?.status !== 'shipped' &&
          input.new_value?.status === 'shipped'
        )
      }),
    ],
    enqueues: ['notification'],
  },
  async (input, ctx) => {
    ctx.logger.info('Order shipped', {
      orderId: input.item_id,
      order: input.new_value,
    })
    
    await ctx.enqueue({
      topic: 'notification',
      data: {
        email: input.new_value.email,
        templateId: 'order-shipped',
        orderId: input.item_id,
      },
    })
  }
)

stream()

Create a trigger for real-time stream events.

Signature

function stream(
  streamName: string,
  options?: StreamOptions | TriggerCondition
): StreamTrigger

Parameters

streamName
string
required
Name of the stream to subscribe to
options
StreamOptions | TriggerCondition
Can be a condition function or an options object:

Example

import { step, stream } from 'motia'
import type { StreamTriggerInput } from 'motia'

interface MergeStatus {
  startedAt: number
  totalSteps: number
  completedSteps: number
}

export const { config, handler } = step(
  {
    name: 'OnMergeComplete',
    description: 'Trigger when all parallel steps complete',
    triggers: [
      stream('parallelMerge', {
        groupId: 'merge-groups',
        condition: (input: StreamTriggerInput<MergeStatus>) => {
          return (
            input.event.type === 'update' &&
            input.event.data.completedSteps === input.event.data.totalSteps
          )
        },
      }),
    ],
  },
  async (input, ctx) => {
    const { data } = input.event
    const duration = Date.now() - data.startedAt
    
    ctx.logger.info('Parallel merge complete', {
      totalSteps: data.totalSteps,
      duration,
    })
  }
)

Trigger conditions

All trigger functions accept an optional condition parameter to filter when the trigger executes:
type TriggerCondition<TInput = unknown> = (
  input: TriggerInput<TInput>,
  ctx: FlowContext<never, TriggerInput<TInput>>
) => boolean | Promise<boolean>

Example with condition

import { step, queue } from 'motia'

export const { config, handler } = step(
  {
    name: 'ProcessHighPriorityOrders',
    triggers: [
      queue(
        'orders',
        { input: orderSchema },
        (input, ctx) => {
          // Only process high-priority orders
          return input.priority === 'high'
        }
      ),
    ],
  },
  async (input, ctx) => {
    await processOrder(input)
  }
)

Build docs developers (and LLMs) love