Handlers are async functions that execute when a step is triggered. They receive input data and a context object, then perform business logic and optionally return a response.
Handler signature
Every handler follows this signature:
type StepHandler < TInput , TEnqueueData > = (
input : TriggerInput < TInput >,
ctx : FlowContext < TEnqueueData , TriggerInput < TInput >>
) => Promise < ApiResponse | void >
from typing import Awaitable, Callable, Any
from motia import FlowContext
StepHandler = Callable[[Any, FlowContext], Awaitable[Any]]
Parameters
input : The trigger-specific input data (HTTP request, queue message, etc.)
ctx : The FlowContext providing runtime capabilities
Return value
HTTP triggers : Must return ApiResponse object with status, body, and optional headers
Queue/Cron triggers : Return void (nothing) or throw for retry
State/Stream triggers : Can return any value
Basic handler
import { step , http } from 'motia'
export default step ({
name: 'hello-world' ,
triggers: [ http ( 'GET' , '/hello' )]
}, async ( input , ctx ) => {
ctx . logger . info ( 'Hello handler invoked' )
return {
status: 200 ,
body: { message: 'Hello, World!' }
}
} )
from motia import step, http
default = step({
'name' : 'hello-world' ,
'triggers' : [http( 'GET' , '/hello' )]
}, async def handler( input , ctx):
ctx.logger.info( 'Hello handler invoked' )
return {
'status' : 200 ,
'body' : { 'message' : 'Hello, World!' }
}
)
Pattern matching
When a step has multiple trigger types, use ctx.match() for type-safe handling:
import { step , http , queue , state } from 'motia'
import { z } from 'zod'
const userSchema = z . object ({
userId: z . string (),
email: z . string (). email ()
})
export default step ({
name: 'notify-user' ,
triggers: [
http ( 'POST' , '/notify' , { bodySchema: userSchema }),
queue ( 'notifications' , { input: userSchema }),
state ()
] ,
enqueues: [ 'email-queue' ]
}, async ( input , ctx ) => {
return ctx . match ({
// HTTP handler
http : async ( req ) => {
const user = req . request . body
await sendNotification ( user , ctx )
return {
status: 200 ,
body: { sent: true , userId: user . userId }
}
},
// Queue handler
queue : async ( user ) => {
await sendNotification ( user , ctx )
},
// State handler
state : async ( stateInput ) => {
if ( stateInput . new_value ?. status === 'active' ) {
const user = stateInput . new_value
await sendNotification ( user , ctx )
}
}
})
} )
async function sendNotification (
user : z . infer < typeof userSchema >,
ctx : FlowContext
) {
await ctx . enqueue ({
topic: 'email-queue' ,
data: {
to: user . email ,
template: 'notification' ,
userId: user . userId
}
})
}
from motia import step, http, queue, state
default = step({
'name' : 'notify-user' ,
'triggers' : [
http( 'POST' , '/notify' ),
queue( 'notifications' ),
state()
],
'enqueues' : [ 'email-queue' ]
}, async def handler( input , ctx):
async def handle_http(req):
user = req.request.body
await send_notification(user, ctx)
return {
'status' : 200 ,
'body' : { 'sent' : True , 'userId' : user[ 'userId' ]}
}
async def handle_queue(user):
await send_notification(user, ctx)
async def handle_state(state_input):
if state_input.get( 'new_value' , {}).get( 'status' ) == 'active' :
user = state_input[ 'new_value' ]
await send_notification(user, ctx)
return await ctx.match({
'http' : handle_http,
'queue' : handle_queue,
'state' : handle_state
})
)
async def send_notification ( user , ctx ):
await ctx.enqueue({
'topic' : 'email-queue' ,
'data' : {
'to' : user[ 'email' ],
'template' : 'notification' ,
'userId' : user[ 'userId' ]
}
})
Extract shared logic into separate functions to avoid duplication across trigger handlers.
Type guards
Alternatively, use type guards with ctx.is for conditional logic:
export default step ({
name: 'flexible-handler' ,
triggers: [
http ( 'POST' , '/process' ),
queue ( 'process-queue' )
]
}, async ( input , ctx ) => {
// Extract data regardless of trigger type
const data = ctx . getData ()
// Process the data
const result = await processData ( data )
// Respond differently based on trigger
if ( ctx . is . http ( input )) {
return {
status: 200 ,
body: result ,
headers: { 'X-Request-Id' : ctx . traceId }
}
}
if ( ctx . is . queue ( input )) {
ctx . logger . info ( 'Queue processing complete' , { result })
}
} )
default = step({
'name' : 'flexible-handler' ,
'triggers' : [
http( 'POST' , '/process' ),
queue( 'process-queue' )
]
}, async def handler( input , ctx):
# Extract data regardless of trigger type
data = ctx.get_data()
# Process the data
result = await process_data(data)
# Respond differently based on trigger
if ctx.is_api():
return {
'status' : 200 ,
'body' : result,
'headers' : { 'X-Request-Id' : ctx.trace_id}
}
if ctx.is_queue():
ctx.logger.info( 'Queue processing complete' , extra = { 'result' : result})
)
Error handling
Errors in handlers trigger automatic retries based on infrastructure configuration.
Transient errors
Let transient errors (network issues, timeouts) throw for automatic retry:
export default step ({
name: 'call-api' ,
triggers: [ queue ( 'api-calls' )] ,
infrastructure: {
queue: { maxRetries: 5 , visibilityTimeout: 60 }
}
}, async ( data , ctx ) => {
// Will retry up to 5 times on network errors
const response = await fetch ( 'https://api.example.com/process' , {
method: 'POST' ,
body: JSON . stringify ( data )
})
if ( ! response . ok ) {
throw new Error ( `API error: ${ response . status } ` )
}
ctx . logger . info ( 'API call successful' )
} )
Permanent errors
Handle permanent errors (validation, business logic) explicitly:
import { step , http } from 'motia'
import { z } from 'zod'
const orderSchema = z . object ({
orderId: z . string (),
amount: z . number (). positive ()
})
export default step ({
name: 'create-order' ,
triggers: [ http ( 'POST' , '/orders' , { bodySchema: orderSchema })] ,
enqueues: [ 'order-failed' ]
}, async ( input , ctx ) => {
const order = input . request . body
// Validate business rules
if ( order . amount > 10000 ) {
ctx . logger . warn ( 'Order exceeds limit' , { order })
return {
status: 400 ,
body: { error: 'Order amount exceeds maximum limit' }
}
}
try {
const result = await processOrder ( order )
return { status: 201 , body: result }
} catch ( error ) {
// Check if error is permanent
if ( error . code === 'INVALID_PRODUCT' ) {
ctx . logger . error ( 'Invalid product in order' , { order , error })
// Don't retry, send to error queue
await ctx . enqueue ({
topic: 'order-failed' ,
data: { order , error: error . message }
})
return {
status: 400 ,
body: { error: 'Invalid product' }
}
}
// Transient error, let it retry
throw error
}
} )
from motia import step, http
default = step({
'name' : 'create-order' ,
'triggers' : [http( 'POST' , '/orders' )],
'enqueues' : [ 'order-failed' ]
}, async def handler( input , ctx):
order = input .request.body
# Validate business rules
if order[ 'amount' ] > 10000 :
ctx.logger.warning( 'Order exceeds limit' , extra = { 'order' : order})
return {
'status' : 400 ,
'body' : { 'error' : 'Order amount exceeds maximum limit' }
}
try :
result = await process_order(order)
return { 'status' : 201 , 'body' : result}
except Exception as error:
# Check if error is permanent
if getattr (error, 'code' , None ) == 'INVALID_PRODUCT' :
ctx.logger.error( 'Invalid product in order' ,
extra = { 'order' : order, 'error' : str (error)})
# Don't retry, send to error queue
await ctx.enqueue({
'topic' : 'order-failed' ,
'data' : { 'order' : order, 'error' : str (error)}
})
return {
'status' : 400 ,
'body' : { 'error' : 'Invalid product' }
}
# Transient error, let it retry
raise
)
Don’t catch and ignore errors that should trigger retries. Let them propagate.
Idempotency
Design handlers to be safely retryable:
import { step , queue } from 'motia'
export default step ({
name: 'charge-payment' ,
triggers: [ queue ( 'payments' )]
}, async ( payment , ctx ) => {
const idempotencyKey = `payment- ${ payment . orderId } - ${ payment . timestamp } `
// Check if already processed
const existing = await ctx . state . get ( 'payment-processing' , idempotencyKey )
if ( existing ?. status === 'completed' ) {
ctx . logger . info ( 'Payment already processed' , { idempotencyKey })
return
}
// Mark as processing
await ctx . state . set ( 'payment-processing' , idempotencyKey , {
status: 'processing' ,
startedAt: Date . now ()
})
try {
// Process payment with idempotency key
const result = await chargeCard ( payment , { idempotencyKey })
// Mark as completed
await ctx . state . set ( 'payment-processing' , idempotencyKey , {
status: 'completed' ,
result ,
completedAt: Date . now ()
})
ctx . logger . info ( 'Payment processed' , { result })
} catch ( error ) {
// Clear processing state on error for retry
await ctx . state . delete ( 'payment-processing' , idempotencyKey )
throw error
}
} )
from motia import step, queue
from time import time
default = step({
'name' : 'charge-payment' ,
'triggers' : [queue( 'payments' )]
}, async def handler(payment, ctx):
idempotency_key = f "payment- { payment[ 'orderId' ] } - { payment[ 'timestamp' ] } "
# Check if already processed
existing = await ctx.state.get( 'payment-processing' , idempotency_key)
if existing and existing.get( 'status' ) == 'completed' :
ctx.logger.info( 'Payment already processed' ,
extra = { 'idempotencyKey' : idempotency_key})
return
# Mark as processing
await ctx.state.set( 'payment-processing' , idempotency_key, {
'status' : 'processing' ,
'startedAt' : int (time() * 1000 )
})
try :
# Process payment with idempotency key
result = await charge_card(payment, idempotency_key = idempotency_key)
# Mark as completed
await ctx.state.set( 'payment-processing' , idempotency_key, {
'status' : 'completed' ,
'result' : result,
'completedAt' : int (time() * 1000 )
})
ctx.logger.info( 'Payment processed' , extra = { 'result' : result})
except Exception as error:
# Clear processing state on error for retry
await ctx.state.delete( 'payment-processing' , idempotency_key)
raise
)
Streaming responses
For HTTP triggers, stream large responses:
import { step , http } from 'motia'
export default step ({
name: 'stream-data' ,
triggers: [ http ( 'GET' , '/stream' )]
}, async ( input , ctx ) => {
const { response } = input
// Set response headers
await response . status ( 200 )
await response . headers ({ 'Content-Type' : 'application/json' })
// Stream data chunks
for ( let i = 0 ; i < 10 ; i ++ ) {
const chunk = JSON . stringify ({ index: i , data: `chunk- ${ i } ` })
response . stream . write ( chunk + ' \n ' )
await new Promise ( resolve => setTimeout ( resolve , 100 ))
}
// Close the stream
response . close ()
} )
from motia import step, http
import asyncio
import json
default = step({
'name' : 'stream-data' ,
'triggers' : [http( 'GET' , '/stream' )]
}, async def handler( input , ctx):
response = input .response
# Set response headers
await response.status( 200 )
await response.headers({ 'Content-Type' : 'application/json' })
# Stream data chunks
for i in range ( 10 ):
chunk = json.dumps({ 'index' : i, 'data' : f 'chunk- { i } ' })
await response.writer.send_message_async(chunk + ' \n ' )
await asyncio.sleep( 0.1 )
# Close the stream
response.close()
)
Best practices
Keep handlers focused Each handler should do one thing well. Use multiple steps connected by queues for complex workflows.
Use shared functions Extract common logic into separate functions that multiple trigger handlers can use.
Log contextually Include relevant data in log messages. The trace ID is automatically included.
Validate early Check business rules at the start of handlers and return errors immediately.
Handle errors appropriately Let transient errors retry. Handle permanent errors explicitly.
Design for retries Use idempotency keys and state to ensure handlers can safely retry.
Next steps
Context API Explore all FlowContext capabilities
State management Learn state operations in depth
Streams Work with real-time streams
Triggers Understand all trigger types