Skip to main content

Overview

Triggers define how and when a step executes. Motia supports 5 trigger types:
  1. HTTP - REST API endpoints
  2. Queue - Event-driven messaging
  3. Cron - Time-based scheduling
  4. State - State change reactions
  5. Stream - Real-time data stream events
Steps can have multiple triggers, allowing them to respond to different invocation methods.

HTTP Triggers

HTTP triggers expose steps as REST API endpoints.

Basic HTTP Trigger

import { http } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'CreateTodo',
  triggers: [
    http('POST', '/todo', {
      bodySchema: z.object({
        description: z.string(),
        dueDate: z.string().optional(),
      }),
      responseSchema: {
        200: z.object({ id: z.string(), description: z.string() }),
        400: z.object({ error: z.string() }),
      },
    }),
  ],
}

export const handler: Handlers<typeof config> = async ({ request }, ctx) => {
  const { description, dueDate } = request.body
  
  if (!description) {
    return { status: 400, body: { error: 'Description is required' } }
  }
  
  const todoId = `todo-${Date.now()}`
  
  return { status: 200, body: { id: todoId, description } }
}

HTTP Trigger Options

http(method: ApiRouteMethod, path: string, options?: {
  bodySchema?: ZodSchema | JsonSchema,
  responseSchema?: Record<number, Schema>,
  queryParams?: readonly QueryParam[],
  middleware?: readonly ApiMiddleware[],
})
Supported Methods: GET, POST, PUT, DELETE, PATCH, OPTIONS, HEAD Path Parameters: Use :param syntax for dynamic routes:
http('GET', '/users/:userId/posts/:postId')

// Access in handler
const { userId, postId } = request.pathParams
Query Parameters:
http('GET', '/search', {
  queryParams: [
    { name: 'q', description: 'Search query' },
    { name: 'limit', description: 'Results limit' },
  ],
})

// Access in handler
const { q, limit } = request.queryParams
Reference: motia-js/packages/motia/src/triggers.ts:29-37

Queue Triggers

Queue triggers subscribe to event topics for asynchronous, event-driven processing.

Basic Queue Trigger

import { queue } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'ProcessOrder',
  triggers: [
    queue('order.created', {
      input: z.object({
        orderId: z.string(),
        amount: z.number(),
        items: z.array(z.string()),
      }),
    }),
  ],
  enqueues: ['order.processed'],
}

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Processing order', { orderId: input.orderId })
  
  // Process the order
  const result = await processOrderLogic(input)
  
  // Enqueue next event
  await ctx.enqueue({
    topic: 'order.processed',
    data: { orderId: input.orderId, status: 'completed' },
  })
}

Queue Configuration

queue(topic: string, options?: {
  input?: ZodSchema | JsonSchema,
  infrastructure?: {
    queue?: {
      type: 'fifo' | 'standard',
      maxRetries: number,
      visibilityTimeout: number,
      delaySeconds: number,
    },
  },
})
Queue Types:
  • Standard: High throughput, at-least-once delivery, best-effort ordering
  • FIFO: Exactly-once processing, strict ordering, lower throughput
Message Group ID: For FIFO queues, use messageGroupId to partition messages:
await ctx.enqueue({
  topic: 'order.processing',
  data: { orderId: '123' },
  messageGroupId: 'customer-456', // Orders from same customer processed in order
})
Reference: motia-js/packages/motia/src/types.ts:194-201

Cron Triggers

Cron triggers execute steps on a schedule using cron expressions.

Basic Cron Trigger

import { cron } from 'motia'

export const config = {
  name: 'DailyReport',
  triggers: [
    cron('0 9 * * *'), // Every day at 9:00 AM
  ],
  enqueues: ['report.generated'],
}

export const handler: Handlers<typeof config> = async (_input, ctx) => {
  ctx.logger.info('Generating daily report')
  
  const report = await generateReport()
  
  await ctx.enqueue({
    topic: 'report.generated',
    data: { reportId: report.id, timestamp: new Date().toISOString() },
  })
}

Cron Expression Format

Motia uses extended cron syntax with optional seconds field:
* * * * * * *
│ │ │ │ │ │ └─ Year (optional)
│ │ │ │ │ └─── Day of week (0-7, 0 and 7 = Sunday)
│ │ │ │ └───── Month (1-12)
│ │ │ └─────── Day of month (1-31)
│ │ └───────── Hour (0-23)
│ └─────────── Minute (0-59)
└───────────── Second (0-59, optional)
Common Patterns:
cron('* * * * *')        // Every minute
cron('0 * * * *')        // Every hour
cron('0 0 * * *')        // Daily at midnight
cron('0 0 * * 0')        // Weekly on Sunday
cron('0 0 1 * *')        // Monthly on 1st
cron('*/5 * * * *')      // Every 5 minutes
cron('0 9,17 * * 1-5')   // 9 AM and 5 PM, Monday-Friday
Reference: motia-js/playground/steps/cronExample/handlePeriodicJob.step.ts:8

State Triggers

State triggers react to changes in the state management system.

Basic State Trigger

import { state } from 'motia'

export const config = {
  name: 'OnUserStatusChange',
  triggers: [
    state({
      condition: (input, ctx) => input.group_id === 'users',
    }),
  ],
  enqueues: ['user.status.changed'],
}

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('User state changed', {
    groupId: input.group_id,
    itemId: input.item_id,
    oldValue: input.old_value,
    newValue: input.new_value,
  })
  
  await ctx.enqueue({
    topic: 'user.status.changed',
    data: {
      userId: input.item_id,
      oldStatus: input.old_value?.status,
      newStatus: input.new_value?.status,
    },
  })
}

State Trigger Input

State triggers receive the following input structure:
type StateTriggerInput<T> = {
  type: 'state'
  group_id: string
  item_id: string
  old_value?: T
  new_value?: T
}
Reference: motia-js/packages/motia/src/types.ts:113-119, motia-py/playground/steps/state_example/on_state_change_step.py

Stream Triggers

Stream triggers react to real-time events from data streams.

Basic Stream Trigger

import { stream } from 'motia'

export const config = {
  name: 'OnTodoStreamEvent',
  triggers: [
    stream('todo', {
      groupId: 'inbox',  // Optional: filter by group
      itemId: undefined,  // Optional: filter by specific item
    }),
  ],
  enqueues: ['todo.processed'],
}

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Todo stream event', {
    streamName: input.streamName,
    groupId: input.groupId,
    itemId: input.id,
    eventType: input.event.type,
  })
  
  if (input.event.type === 'create') {
    ctx.logger.info('New todo created', { id: input.id })
  } else if (input.event.type === 'update') {
    ctx.logger.info('Todo updated', { id: input.id })
  } else if (input.event.type === 'delete') {
    ctx.logger.info('Todo deleted', { id: input.id })
  }
  
  await ctx.enqueue({
    topic: 'todo.processed',
    data: { todoId: input.id, eventType: input.event.type },
  })
}

Stream Trigger Input

Stream triggers receive the following input structure:
type StreamTriggerInput<T> = {
  type: 'stream'
  timestamp: number
  streamName: string
  groupId: string
  id: string
  event: StreamEvent<T>
}

type StreamEvent<T> =
  | { type: 'create'; data: T }
  | { type: 'update'; data: T }
  | { type: 'delete'; data: T }
Reference: motia-js/packages/motia/src/types.ts:121-133, motia-py/playground/steps/stream_example/on_todo_event_step.py

Trigger Conditions

All trigger types support optional conditions to filter invocations:
queue('order.created', {
  condition: (input, ctx) => {
    // Only process high-value orders
    return input.amount > 1000
  },
})

http('POST', '/orders', {
  condition: (input, ctx) => {
    // Only process for verified users
    return input.body.user.verified === true
  },
})
Conditions can be async:
state({
  condition: async (input, ctx) => {
    const user = await ctx.state.get('users', input.item_id)
    return user?.role === 'admin'
  },
})
Reference: motia-js/playground/steps/multi-trigger-example.step.ts:4-16

Trigger Registration Protocol

The III Engine uses a WebSocket protocol for trigger registration:
Message::RegisterTrigger {
    id: String,
    trigger_type: String,
    function_id: String,
    config: Value,
}

Message::TriggerRegistrationResult {
    id: String,
    trigger_type: String,
    function_id: String,
    error: Option<ErrorBody>,
}
Reference: engine/src/protocol.rs:39-51

Best Practices

  • Use HTTP for synchronous requests requiring immediate responses
  • Use Queue for asynchronous, decoupled event processing
  • Use Cron for scheduled tasks
  • Use State for reactive data change handling
  • Use Stream for real-time data synchronization
Queue triggers may retry on failure. Ensure handlers are idempotent and can safely process the same message multiple times.
Conditions filter invocations at the trigger level, reducing unnecessary executions and improving performance.
  • HTTP triggers: Return appropriate status codes (400, 500, etc.)
  • Queue triggers: Throw errors to trigger retries via DLQ
  • Cron triggers: Log errors and consider alerting mechanisms

Next Steps

Context API

Learn about ctx.enqueue(), ctx.state, and more

HTTP Triggers

Add middleware to HTTP triggers

State Management

Work with state triggers

Streams

Work with stream triggers

Build docs developers (and LLMs) love