Every step handler receives a FlowContext object as its second parameter. This context provides access to Motia’s runtime features: queues, state, streams, logging, and trigger information.
FlowContext interface
interface FlowContext < TEnqueueData , TInput > {
// Queue operations
enqueue : Enqueuer < TEnqueueData >
// Tracing
traceId : string
// State management
state : InternalStateManager
// Logging
logger : Logger
// Streams
streams : Streams
// Trigger information
trigger : TriggerInfo
// Type guards
is : {
queue : ( input : TInput ) => input is QueueInput
http : ( input : TInput ) => input is HttpInput
cron : ( input : TInput ) => input is never
state : ( input : TInput ) => input is StateTriggerInput
stream : ( input : TInput ) => input is StreamTriggerInput
}
// Data extraction
getData : () => ExtractDataPayload < TInput >
// Pattern matching
match : < TResult >( handlers : MatchHandlers < TInput , TEnqueueData , TResult >) =>
Promise < TResult | undefined >
}
from pydantic import BaseModel
from typing import Any, Awaitable, Callable, Generic, TypeVar
TEnqueueData = TypeVar( 'TEnqueueData' )
class FlowContext ( BaseModel , Generic[TEnqueueData]):
# Queue operations
enqueue: Callable[[Any], Awaitable[ None ]]
# Tracing
trace_id: str
# State management
state: Any # InternalStateManager
# Logging
logger: Any # Logger
# Streams
streams: dict[ str , Stream[Any]]
# Trigger information
trigger: TriggerInfo
# Input value
input_value: Any
# Type guards
def is_queue ( self ) -> bool : ...
def is_api ( self ) -> bool : ...
def is_cron ( self ) -> bool : ...
def is_state ( self ) -> bool : ...
def is_stream ( self ) -> bool : ...
# Data extraction
def get_data ( self ) -> Any: ...
# Pattern matching
async def match ( self , handlers : dict[ str , Any]) -> Any: ...
Enqueue
Publish messages to queue topics defined in your step’s enqueues array.
import { step , http } from 'motia'
export default step ({
name: 'create-order' ,
triggers: [ http ( 'POST' , '/orders' )] ,
enqueues: [ 'order-created' , 'send-email' ]
}, async ( input , ctx ) => {
const order = await createOrder ( input . request . body )
// Enqueue to order-created topic
await ctx . enqueue ({
topic: 'order-created' ,
data: { orderId: order . id , amount: order . amount }
})
// Enqueue with message group ID (for FIFO)
await ctx . enqueue ({
topic: 'send-email' ,
data: { to: order . email , template: 'order-confirmation' },
messageGroupId: `customer- ${ order . customerId } `
})
return { status: 201 , body: order }
} )
from motia import step, http
default = step({
'name' : 'create-order' ,
'triggers' : [http( 'POST' , '/orders' )],
'enqueues' : [ 'order-created' , 'send-email' ]
}, async def handler( input , ctx):
order = await create_order( input .request.body)
# Enqueue to order-created topic
await ctx.enqueue({
'topic' : 'order-created' ,
'data' : { 'orderId' : order[ 'id' ], 'amount' : order[ 'amount' ]}
})
# Enqueue with message group ID (for FIFO)
await ctx.enqueue({
'topic' : 'send-email' ,
'data' : { 'to' : order[ 'email' ], 'template' : 'order-confirmation' },
'messageGroupId' : f "customer- { order[ 'customerId' ] } "
})
return { 'status' : 201 , 'body' : order}
)
You can only enqueue to topics declared in the step’s enqueues configuration. Motia validates this at build time.
State
Access scoped key-value storage that persists across step executions.
import { step , http } from 'motia'
export default step ({
name: 'track-visits' ,
triggers: [ http ( 'GET' , '/page/:pageId' )]
}, async ( input , ctx ) => {
const { pageId } = input . request . pathParams
const scope = 'page-visits'
// Get current count
const current = await ctx . state . get <{ count : number }>( scope , pageId )
const count = ( current ?. count ?? 0 ) + 1
// Update count
await ctx . state . set ( scope , pageId , { count , lastVisit: Date . now () })
return { status: 200 , body: { pageId , visits: count } }
} )
from motia import step, http
from time import time
default = step({
'name' : 'track-visits' ,
'triggers' : [http( 'GET' , '/page/ {pageId} ' )]
}, async def handler( input , ctx):
page_id = input .request.path_params[ 'pageId' ]
scope = 'page-visits'
# Get current count
current = await ctx.state.get(scope, page_id)
count = (current.get( 'count' , 0 ) if current else 0 ) + 1
# Update count
await ctx.state.set(scope, page_id, {
'count' : count,
'lastVisit' : int (time() * 1000 )
})
return { 'status' : 200 , 'body' : { 'pageId' : page_id, 'visits' : count}}
)
State operations
get
(scope: string, key: string) => Promise<T | null>
Retrieve a value by scope and key
set
(scope: string, key: string, value: T) => Promise<StreamSetResult<T> | null>
Store a value with scope and key
update
(scope: string, key: string, ops: UpdateOp[]) => Promise<StreamSetResult<T> | null>
Apply partial updates using update operations:
{ type: 'set', path: 'field', value: newValue }
{ type: 'delete', path: 'field' }
{ type: 'increment', path: 'counter', value: 1 }
delete
(scope: string, key: string) => Promise<T | null>
Delete a value and return the previous value
list
(scope: string) => Promise<T[]>
List all values in a scope
clear
(scope: string) => Promise<void>
Delete all values in a scope
Use scopes to organize related data. For example, user-sessions, rate-limits, or feature-flags.
Streams
Access configured streams for real-time data operations.
import { step , http } from 'motia'
export default step ({
name: 'update-document' ,
triggers: [ http ( 'PUT' , '/documents/:id' )]
}, async ( input , ctx ) => {
const { id } = input . request . pathParams
const updates = input . request . body
// Access a stream
const documents = ctx . streams . documents
// Update the document
await documents . update ( 'workspace-1' , id , [
{ type: 'set' , path: 'content' , value: updates . content },
{ type: 'set' , path: 'updatedAt' , value: Date . now () }
])
// Get the updated document
const doc = await documents . get ( 'workspace-1' , id )
return { status: 200 , body: doc }
} )
from motia import step, http
from time import time
default = step({
'name' : 'update-document' ,
'triggers' : [http( 'PUT' , '/documents/ {id} ' )]
}, async def handler( input , ctx):
doc_id = input .request.path_params[ 'id' ]
updates = input .request.body
# Access a stream
documents = ctx.streams[ 'documents' ]
# Update the document
await documents.update( 'workspace-1' , doc_id, [
{ 'type' : 'set' , 'path' : 'content' , 'value' : updates[ 'content' ]},
{ 'type' : 'set' , 'path' : 'updatedAt' , 'value' : int (time() * 1000 )}
])
# Get the updated document
doc = await documents.get( 'workspace-1' , doc_id)
return { 'status' : 200 , 'body' : doc}
)
Streams must be declared in motia.config.ts to be accessible via ctx.streams. See Streams for details.
Logger
Structured logging with automatic trace ID inclusion.
export default step ({
name: 'process-payment' ,
triggers: [ http ( 'POST' , '/payments' )]
}, async ( input , ctx ) => {
ctx . logger . info ( 'Processing payment' , {
amount: input . request . body . amount ,
currency: input . request . body . currency
})
try {
const result = await chargeCard ( input . request . body )
ctx . logger . info ( 'Payment successful' , { transactionId: result . id })
return { status: 200 , body: result }
} catch ( error ) {
ctx . logger . error ( 'Payment failed' , { error: error . message })
return { status: 500 , body: { error: 'Payment processing failed' } }
}
} )
default = step({
'name' : 'process-payment' ,
'triggers' : [http( 'POST' , '/payments' )]
}, async def handler( input , ctx):
ctx.logger.info( 'Processing payment' , extra = {
'amount' : input .request.body[ 'amount' ],
'currency' : input .request.body[ 'currency' ]
})
try :
result = await charge_card( input .request.body)
ctx.logger.info( 'Payment successful' ,
extra = { 'transactionId' : result[ 'id' ]})
return { 'status' : 200 , 'body' : result}
except Exception as error:
ctx.logger.error( 'Payment failed' , extra = { 'error' : str (error)})
return { 'status' : 500 , 'body' : { 'error' : 'Payment processing failed' }}
)
Log levels
ctx.logger.debug() - Detailed debugging information
ctx.logger.info() - General informational messages
ctx.logger.warn() - Warning messages
ctx.logger.error() - Error messages
Trace ID
Every step execution has a unique trace ID for distributed tracing.
export default step ( config , async ( input , ctx ) => {
// Pass trace ID to external services
await fetch ( 'https://api.example.com/process' , {
headers: {
'X-Trace-Id' : ctx . traceId
},
body: JSON . stringify ( data )
})
} )
Trigger info
Inspect which trigger fired the current execution.
interface TriggerInfo {
type : 'http' | 'queue' | 'cron' | 'state' | 'stream'
index ?: number // Index in triggers array
// HTTP specific
path ?: string
method ?: string
// Queue specific
topic ?: string
// Cron specific
expression ?: string
}
class TriggerInfo :
type : Literal[ 'http' , 'queue' , 'cron' , 'state' , 'stream' ]
index: int | None
# HTTP specific
path: str | None
method: str | None
# Queue specific
topic: str | None
# Cron specific
expression: str | None
Example usage:
export default step ( config , async ( input , ctx ) => {
ctx . logger . info ( 'Step triggered' , {
type: ctx . trigger . type ,
method: ctx . trigger . method ,
path: ctx . trigger . path
})
} )
Type guards
Check trigger type at runtime with type-safe guards.
export default step ({
name: 'multi-trigger-step' ,
triggers: [
http ( 'POST' , '/items' ),
queue ( 'items' )
]
}, async ( input , ctx ) => {
if ( ctx . is . http ( input )) {
// TypeScript knows input is MotiaHttpArgs
const body = input . request . body
return { status: 200 , body: { received: true } }
}
if ( ctx . is . queue ( input )) {
// TypeScript knows input is queue data
await processQueueMessage ( input )
}
} )
default = step({
'name' : 'multi-trigger-step' ,
'triggers' : [
http( 'POST' , '/items' ),
queue( 'items' )
]
}, async def handler( input , ctx):
if ctx.is_api():
# Input is MotiaHttpArgs
body = input .request.body
return { 'status' : 200 , 'body' : { 'received' : True }}
if ctx.is_queue():
# Input is queue data
await process_queue_message( input )
)
Get data
Extract the data payload regardless of trigger type.
import { step , http , queue } from 'motia'
import { z } from 'zod'
const orderSchema = z . object ({
orderId: z . string (),
amount: z . number ()
})
export default step ({
name: 'process-order' ,
triggers: [
http ( 'POST' , '/orders' , { bodySchema: orderSchema }),
queue ( 'orders' , { input: orderSchema })
]
}, async ( input , ctx ) => {
// Extract data from both HTTP body and queue message
const orderData = ctx . getData ()
// orderData is typed as z.infer<typeof orderSchema>
await processOrder ( orderData )
if ( ctx . is . http ( input )) {
return { status: 200 , body: { success: true } }
}
} )
from motia import step, http, queue
default = step({
'name' : 'process-order' ,
'triggers' : [
http( 'POST' , '/orders' ),
queue( 'orders' )
]
}, async def handler( input , ctx):
# Extract data from both HTTP body and queue message
order_data = ctx.get_data()
await process_order(order_data)
if ctx.is_api():
return { 'status' : 200 , 'body' : { 'success' : True }}
)
Use ctx.getData() when multiple triggers share the same data schema. This avoids duplicating data extraction logic.
Pattern matching
Handle different trigger types with type-safe pattern matching.
export default step ({
name: 'sync-user' ,
triggers: [
http ( 'POST' , '/users/:id/sync' ),
queue ( 'user-sync' ),
cron ( '0 2 * * *' ) // Daily at 2 AM
]
}, async ( input , ctx ) => {
return ctx . match ({
http : async ( req ) => {
const userId = req . request . pathParams . id
await syncUser ( userId )
return { status: 200 , body: { synced: userId } }
},
queue : async ( data ) => {
await syncUser ( data . userId )
},
cron : async () => {
await syncAllUsers ()
},
default : async ( input ) => {
ctx . logger . warn ( 'Unhandled trigger type' )
}
})
} )
default = step({
'name' : 'sync-user' ,
'triggers' : [
http( 'POST' , '/users/ {id} /sync' ),
queue( 'user-sync' ),
cron( '0 2 * * *' ) # Daily at 2 AM
]
}, async def handler( input , ctx):
async def handle_http(req):
user_id = req.request.path_params[ 'id' ]
await sync_user(user_id)
return { 'status' : 200 , 'body' : { 'synced' : user_id}}
async def handle_queue(data):
await sync_user(data[ 'userId' ])
async def handle_cron():
await sync_all_users()
return await ctx.match({
'http' : handle_http,
'queue' : handle_queue,
'cron' : handle_cron
})
)
ctx.match() throws if no handler matches and no default handler is provided.
Next steps
Handlers Learn handler patterns and best practices
State management Deep dive into state operations
Streams Working with real-time streams
Triggers Review available trigger types