Triggers define how and when a step executes. Motia supports five trigger types: HTTP, queue, cron, state, and stream. Each step can have multiple triggers of different types.
HTTP triggers
HTTP triggers expose your step as a REST API endpoint.
TypeScript
JavaScript
Python
import { step , http } from 'motia'
import { z } from 'zod'
const createUserSchema = z . object ({
email: z . string (). email (),
name: z . string ()
})
export default step ({
name: 'create-user' ,
triggers: [
http ( 'POST' , '/users' , {
bodySchema: createUserSchema ,
responseSchema: {
200 : z . object ({ id: z . string (), email: z . string () }),
400 : z . object ({ error: z . string () })
},
queryParams: [{ name: 'source' , description: 'User source' }],
middleware: [ authMiddleware , rateLimitMiddleware ]
})
]
}, async ( input , ctx ) => {
const { email , name } = input . request . body
const source = input . request . queryParams . source
const user = await createUser ( email , name )
return {
status: 200 ,
headers: { 'Content-Type' : 'application/json' },
body: { id: user . id , email: user . email }
}
} )
import { step , http } from 'motia'
export default step ({
name: 'create-user' ,
triggers: [
http ( 'POST' , '/users' , {
queryParams: [{ name: 'source' , description: 'User source' }],
middleware: [ authMiddleware , rateLimitMiddleware ]
})
]
}, async ( input , ctx ) => {
const { email , name } = input . request . body
const source = input . request . queryParams . source
const user = await createUser ( email , name )
return {
status: 200 ,
headers: { 'Content-Type' : 'application/json' },
body: { id: user . id , email: user . email }
}
} )
from motia import step, http
default = step({
'name' : 'create-user' ,
'triggers' : [
http( 'POST' , '/users' ,
query_params = [{ 'name' : 'source' , 'description' : 'User source' }],
middleware = [auth_middleware, rate_limit_middleware]
)
]
}, async def handler( input , ctx):
email = input .request.body[ 'email' ]
name = input .request.body[ 'name' ]
source = input .request.query_params.get( 'source' )
user = await create_user(email, name)
return {
'status' : 200 ,
'headers' : { 'Content-Type' : 'application/json' },
'body' : { 'id' : user[ 'id' ], 'email' : user[ 'email' ]}
}
)
HTTP methods
Supported methods: GET, POST, PUT, DELETE, PATCH, OPTIONS, HEAD
Path parameters
Use :param syntax for dynamic path segments:
http ( 'GET' , '/users/:userId/posts/:postId' )
Access via input.request.pathParams:
const { userId , postId } = input . request . pathParams
Request object
interface MotiaHttpArgs < TBody > {
request : {
pathParams : Record < string , string >
queryParams : Record < string , string | string []>
body : TBody
headers : Record < string , string | string []>
method : string
requestBody : ChannelReader // For streaming
}
response : {
status : ( code : number ) => void
headers : ( headers : Record < string , string >) => void
stream : WritableStream
close : () => void
}
}
class MotiaHttpArgs :
request: MotiaHttpRequest
response: MotiaHttpResponse
# Backward-compatible properties
@ property
def path_params ( self ) -> dict[ str , str ]: ...
@ property
def query_params ( self ) -> dict[ str , str | list[ str ]]: ...
@ property
def body ( self ) -> Any: ...
Middleware
Middleware functions run before the handler:
import type { ApiMiddleware } from 'motia'
const authMiddleware : ApiMiddleware = async ( req , ctx , next ) => {
const token = req . request . headers . authorization
if ( ! token ) {
return { status: 401 , body: { error: 'Unauthorized' } }
}
// Continue to next middleware or handler
return next ()
}
async def auth_middleware ( req , ctx , next ):
token = req.request.headers.get( 'authorization' )
if not token:
return { 'status' : 401 , 'body' : { 'error' : 'Unauthorized' }}
# Continue to next middleware or handler
return await next ()
Queue triggers
Queue triggers process messages from topics, enabling asynchronous workflows.
import { step , queue } from 'motia'
import { z } from 'zod'
const orderSchema = z . object ({
orderId: z . string (),
amount: z . number (),
items: z . array ( z . string ())
})
export default step ({
name: 'process-order' ,
triggers: [
queue ( 'orders' , {
input: orderSchema ,
infrastructure: {
queue: {
type: 'fifo' ,
maxRetries: 5 ,
visibilityTimeout: 60 ,
delaySeconds: 0
}
}
})
] ,
enqueues: [ 'order-confirmation' , 'inventory-update' ]
}, async ( orderData , ctx ) => {
// orderData is typed as z.infer<typeof orderSchema>
await processOrder ( orderData )
await ctx . enqueue ({
topic: 'order-confirmation' ,
data: { orderId: orderData . orderId , status: 'processed' }
})
} )
from motia import step, queue
default = step({
'name' : 'process-order' ,
'triggers' : [
queue( 'orders' , infrastructure = {
'queue' : {
'type' : 'fifo' ,
'maxRetries' : 5 ,
'visibilityTimeout' : 60 ,
'delaySeconds' : 0
}
})
],
'enqueues' : [ 'order-confirmation' , 'inventory-update' ]
}, async def handler(order_data, ctx):
await process_order(order_data)
await ctx.enqueue({
'topic' : 'order-confirmation' ,
'data' : { 'orderId' : order_data[ 'orderId' ], 'status' : 'processed' }
})
)
Queue configuration
fifo : First-in-first-out order, exactly-once delivery
standard : Best-effort order, at-least-once delivery (default)
Number of retry attempts before moving to dead letter queue (default: 3)
Seconds a message is hidden after being received (default: 30)
Delay before message becomes available (default: 0)
Message group IDs
For FIFO queues, use messageGroupId to ensure ordering:
await ctx . enqueue ({
topic: 'orders' ,
data: orderData ,
messageGroupId: `customer- ${ customerId } `
})
Cron triggers
Cron triggers run on a schedule using cron expressions.
import { step , cron } from 'motia'
export default step ({
name: 'daily-report' ,
triggers: [
cron ( '0 9 * * *' ) // Every day at 9 AM
] ,
enqueues: [ 'report-generated' ]
}, async ( _ , ctx ) => {
ctx . logger . info ( 'Generating daily report' )
const report = await generateReport ()
await ctx . enqueue ({
topic: 'report-generated' ,
data: report
})
} )
from motia import step, cron
default = step({
'name' : 'daily-report' ,
'triggers' : [
cron( '0 9 * * *' ) # Every day at 9 AM
],
'enqueues' : [ 'report-generated' ]
}, async def handler(_, ctx):
ctx.logger.info( 'Generating daily report' )
report = await generate_report()
await ctx.enqueue({
'topic' : 'report-generated' ,
'data' : report
})
)
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *
Common examples:
0 * * * * - Every hour
*/15 * * * * - Every 15 minutes
0 0 * * 0 - Every Sunday at midnight
0 9-17 * * 1-5 - Every hour from 9 AM to 5 PM, Monday to Friday
Cron triggers do not receive input data. The first parameter is undefined.
State triggers
State triggers fire when state values change, enabling reactive workflows.
import { step , state } from 'motia'
export default step ({
name: 'on-user-status-change' ,
triggers: [
state (( input , ctx ) => {
// Condition: only trigger for user status changes
return input . group_id . startsWith ( 'user:' ) &&
input . old_value ?. status !== input . new_value ?. status
})
]
}, async ( input , ctx ) => {
const { group_id , item_id , old_value , new_value } = input
ctx . logger . info ( `User ${ item_id } status changed` , {
from: old_value ?. status ,
to: new_value ?. status
})
if ( new_value ?. status === 'active' ) {
await sendWelcomeEmail ( item_id )
}
} )
from motia import step, state
def status_changed ( input , ctx ):
return ( input [ 'group_id' ].startswith( 'user:' ) and
input .get( 'old_value' , {}).get( 'status' ) !=
input .get( 'new_value' , {}).get( 'status' ))
default = step({
'name' : 'on-user-status-change' ,
'triggers' : [state( condition = status_changed)]
}, async def handler( input , ctx):
group_id = input [ 'group_id' ]
item_id = input [ 'item_id' ]
old_value = input .get( 'old_value' )
new_value = input .get( 'new_value' )
ctx.logger.info( f "User { item_id } status changed from "
f " { old_value.get( 'status' ) } to { new_value.get( 'status' ) } " )
if new_value.get( 'status' ) == 'active' :
await send_welcome_email(item_id)
)
interface StateTriggerInput < T > {
type : 'state'
group_id : string
item_id : string
old_value ?: T
new_value ?: T
}
class StateTriggerInput :
type : Literal[ 'state' ]
group_id: str
item_id: str
old_value: Any | None
new_value: Any | None
Use state triggers to build reactive systems that respond to data changes automatically.
Stream triggers
Stream triggers fire on create, update, or delete events in a stream.
import { step , stream } from 'motia'
export default step ({
name: 'on-document-change' ,
triggers: [
stream ( 'documents' , {
groupId: 'workspace-123' ,
condition : ( input , ctx ) => {
// Only trigger for PDF documents
return input . event . data . type === 'pdf'
}
})
]
}, async ( input , ctx ) => {
const { event , groupId , id } = input
switch ( event . type ) {
case 'create' :
await indexDocument ( event . data )
break
case 'update' :
await reindexDocument ( event . data )
break
case 'delete' :
await removeFromIndex ( id )
break
}
} )
from motia import step, stream
def is_pdf ( input , ctx ):
return input [ 'event' ][ 'data' ].get( 'type' ) == 'pdf'
default = step({
'name' : 'on-document-change' ,
'triggers' : [
stream( 'documents' ,
group_id = 'workspace-123' ,
condition = is_pdf)
]
}, async def handler( input , ctx):
event = input [ 'event' ]
group_id = input [ 'groupId' ]
item_id = input [ 'id' ]
if event[ 'type' ] == 'create' :
await index_document(event[ 'data' ])
elif event[ 'type' ] == 'update' :
await reindex_document(event[ 'data' ])
elif event[ 'type' ] == 'delete' :
await remove_from_index(item_id)
)
interface StreamTriggerInput < T > {
type : 'stream'
timestamp : number
streamName : string
groupId : string
id : string
event : StreamEvent < T >
}
type StreamEvent < T > =
| { type : 'create' ; data : T }
| { type : 'update' ; data : T }
| { type : 'delete' ; data : T }
class StreamTriggerInput :
type : Literal[ 'stream' ]
timestamp: int
stream_name: str
group_id: str
id : str
event: StreamEvent
class StreamEvent :
type : Literal[ 'create' , 'update' , 'delete' ]
data: Any
Filtering streams
You can filter by group and/or item:
// Listen to all events in a specific group
stream ( 'documents' , { groupId: 'workspace-123' })
// Listen to a specific item
stream ( 'documents' , { groupId: 'workspace-123' , itemId: 'doc-456' })
// Listen to all events (no filter)
stream ( 'documents' )
Conditional triggers
All triggers support conditional execution:
import { step , queue } from 'motia'
export default step ({
name: 'process-priority-orders' ,
triggers: [
queue ( 'orders' , undefined , ( data , ctx ) => {
// Only process orders over $1000
return data . amount > 1000
})
]
}, async ( data , ctx ) => {
await processPriorityOrder ( data )
} )
from motia import step, queue
def is_priority ( data , ctx ):
return data[ 'amount' ] > 1000
default = step({
'name' : 'process-priority-orders' ,
'triggers' : [queue( 'orders' , condition = is_priority)]
}, async def handler(data, ctx):
await process_priority_order(data)
)
Condition functions run for every trigger event. Keep them fast and side-effect free.
Next steps
Context API Learn about FlowContext features
Handlers Handler patterns and best practices
State management Working with state in handlers
Streams Real-time data with streams