Overview
Trigger factory functions create typed trigger configurations for steps. Each function returns a trigger object that defines when and how a step should execute.
http()
Create an HTTP API trigger for REST endpoints.
Signature
function http < TOptions extends ApiOptions >(
method : ApiRouteMethod ,
path : string ,
options ?: TOptions ,
condition ?: TriggerCondition
) : ApiTrigger
Parameters
HTTP method: 'GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', or 'HEAD'
URL path pattern. Supports path parameters with :paramName syntax (e.g., /users/:id)
Zod schema or JSON Schema for request body validation
responseSchema
Record<number, StepSchemaInput>
Response schemas keyed by status code (e.g., { 200: schema, 404: errorSchema })
Array of query parameter definitions with name and description
Middleware functions to run before the handler
Optional function to conditionally execute the trigger
Example
import { step , http } from 'motia'
import { z } from 'zod'
const petSchema = z . object ({
name: z . string (),
photoUrl: z . string (). url (),
})
export const { config , handler } = step (
{
name: 'CreatePet' ,
triggers: [
http ( 'POST' , '/pets' , {
bodySchema: petSchema ,
responseSchema: {
200 : z . object ({ id: z . string (), name: z . string () }),
400 : z . object ({ error: z . string () }),
},
}),
],
},
async ( request , ctx ) => {
const { name , photoUrl } = request . body
const pet = await createPet ({ name , photoUrl })
return {
status: 200 ,
body: { id: pet . id , name: pet . name },
}
}
)
Path parameters
http ( 'GET' , '/users/:userId/posts/:postId' , {
responseSchema: {
200 : z . object ({ userId: z . string (), postId: z . string () }),
},
})
// In handler:
async ( request , ctx ) => {
const { userId , postId } = request . pathParams
}
queue()
Create a queue trigger for asynchronous event processing.
Signature
function queue < TOptions extends QueueOptions >(
topic : string ,
options ?: TOptions ,
condition ?: TriggerCondition
) : QueueTrigger
Parameters
Queue topic name to subscribe to
Zod schema or JSON Schema for queue message validation
Infrastructure configuration for handler and queue settings
ram: Memory in MB (e.g., 256, 512, 1024)
cpu: CPU units (optional)
timeout: Execution timeout in seconds
type: 'fifo' or 'standard'
maxRetries: Maximum retry attempts
visibilityTimeout: Message visibility timeout in seconds
delaySeconds: Delay before message becomes available
Optional function to conditionally process messages
Example
import { step , queue } from 'motia'
import { z } from 'zod'
const orderSchema = z . object ({
email: z . string (). email (),
quantity: z . number (). positive (),
petId: z . string (),
})
export const { config , handler } = step (
{
name: 'ProcessOrder' ,
triggers: [
queue ( 'process-food-order' , {
input: orderSchema ,
infrastructure: {
handler: {
ram: 512 ,
timeout: 30 ,
},
queue: {
type: 'fifo' ,
maxRetries: 3 ,
visibilityTimeout: 60 ,
},
},
}),
],
enqueues: [ 'notification' ],
},
async ( input , ctx ) => {
ctx . logger . info ( 'Processing order' , { input })
const order = await processOrder ( input )
await ctx . state . set ( 'orders' , order . id , order )
await ctx . enqueue ({
topic: 'notification' ,
data: { email: input . email , orderId: order . id },
})
}
)
cron()
Create a scheduled trigger using cron expressions.
Signature
function cron (
expression : string ,
condition ?: TriggerCondition
) : CronTrigger
Parameters
Cron expression defining the schedule. Supports standard cron syntax with optional seconds field
Optional function to conditionally execute scheduled tasks
Example
import { step , cron } from 'motia'
export const { config , handler } = step (
{
name: 'DailyCleanup' ,
description: 'Clean up old orders daily at midnight' ,
triggers: [
cron ( '0 0 * * *' ), // Every day at 00:00
],
},
async ( _input , ctx ) => {
ctx . logger . info ( 'Running daily cleanup' )
const orders = await ctx . state . list ( 'orders' )
const oldOrders = orders . filter ( isExpired )
for ( const order of oldOrders ) {
await ctx . state . delete ( 'orders' , order . id )
}
ctx . logger . info ( `Cleaned up ${ oldOrders . length } old orders` )
}
)
┌────────────── second (0-59, optional)
│ ┌──────────── minute (0-59)
│ │ ┌────────── hour (0-23)
│ │ │ ┌──────── day of month (1-31)
│ │ │ │ ┌────── month (1-12)
│ │ │ │ │ ┌──── day of week (0-6, Sunday=0)
│ │ │ │ │ │
* * * * * *
Common patterns:
0 0 * * * - Daily at midnight
0 */6 * * * - Every 6 hours
0 0 * * 0 - Weekly on Sunday at midnight
0 0 1 * * - Monthly on the 1st at midnight
*/30 * * * * * - Every 30 seconds
state()
Create a trigger that fires on state changes.
Signature
function state (
condition ?: TriggerCondition
) : StateTrigger
Parameters
Function to filter which state changes trigger execution. Receives StateTriggerInput with group_id, item_id, old_value, and new_value
Example
import { step , state } from 'motia'
import type { StateTriggerInput } from 'motia'
interface Order {
id : string
status : string
email : string
}
export const { config , handler } = step (
{
name: 'OnOrderShipped' ,
triggers: [
state (( input : StateTriggerInput < Order >) => {
return (
input . group_id === 'orders' &&
input . old_value ?. status !== 'shipped' &&
input . new_value ?. status === 'shipped'
)
}),
],
enqueues: [ 'notification' ],
},
async ( input , ctx ) => {
ctx . logger . info ( 'Order shipped' , {
orderId: input . item_id ,
order: input . new_value ,
})
await ctx . enqueue ({
topic: 'notification' ,
data: {
email: input . new_value . email ,
templateId: 'order-shipped' ,
orderId: input . item_id ,
},
})
}
)
stream()
Create a trigger for real-time stream events.
Signature
function stream (
streamName : string ,
options ?: StreamOptions | TriggerCondition
) : StreamTrigger
Parameters
Name of the stream to subscribe to
options
StreamOptions | TriggerCondition
Can be a condition function or an options object: Show StreamOptions properties
Filter events to a specific group
Filter events to a specific item
Function to filter which events trigger execution
Example
import { step , stream } from 'motia'
import type { StreamTriggerInput } from 'motia'
interface MergeStatus {
startedAt : number
totalSteps : number
completedSteps : number
}
export const { config , handler } = step (
{
name: 'OnMergeComplete' ,
description: 'Trigger when all parallel steps complete' ,
triggers: [
stream ( 'parallelMerge' , {
groupId: 'merge-groups' ,
condition : ( input : StreamTriggerInput < MergeStatus >) => {
return (
input . event . type === 'update' &&
input . event . data . completedSteps === input . event . data . totalSteps
)
},
}),
],
},
async ( input , ctx ) => {
const { data } = input . event
const duration = Date . now () - data . startedAt
ctx . logger . info ( 'Parallel merge complete' , {
totalSteps: data . totalSteps ,
duration ,
})
}
)
Trigger conditions
All trigger functions accept an optional condition parameter to filter when the trigger executes:
type TriggerCondition < TInput = unknown > = (
input : TriggerInput < TInput >,
ctx : FlowContext < never , TriggerInput < TInput >>
) => boolean | Promise < boolean >
Example with condition
import { step , queue } from 'motia'
export const { config , handler } = step (
{
name: 'ProcessHighPriorityOrders' ,
triggers: [
queue (
'orders' ,
{ input: orderSchema },
( input , ctx ) => {
// Only process high-priority orders
return input . priority === 'high'
}
),
],
},
async ( input , ctx ) => {
await processOrder ( input )
}
)