Queue triggers allow you to process messages from topics, enabling asynchronous workflows, event-driven architectures, and background job processing.
Basic usage
import { step, queue } from 'motia'
export const config = step({
name: 'process-order',
triggers: [queue('orders')],
})
export const handler = async (input, ctx) => {
const order = input
ctx.logger.info('Processing order', { orderId: order.id })
// Process the order
await processOrder(order)
}
from motia import step, queue
config = step(
name='process-order',
triggers=[queue('orders')],
)
async def handler(input, ctx):
order = input
ctx.logger.info('Processing order', {'orderId': order['id']})
# Process the order
await process_order(order)
Schema validation
Define input schemas to validate incoming messages:
import { step, queue } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
id: z.string(),
customerId: z.string(),
items: z.array(z.object({
productId: z.string(),
quantity: z.number(),
})),
total: z.number(),
})
export const config = step({
name: 'process-order',
triggers: [
queue('orders', {
input: orderSchema,
}),
],
})
export const handler = async (input, ctx) => {
// input is fully typed based on orderSchema
const { id, customerId, items, total } = input
ctx.logger.info('Processing order', {
orderId: id,
itemCount: items.length,
total,
})
}
from motia import step, queue
order_schema = {
'type': 'object',
'properties': {
'id': {'type': 'string'},
'customerId': {'type': 'string'},
'items': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'productId': {'type': 'string'},
'quantity': {'type': 'number'},
},
},
},
'total': {'type': 'number'},
},
'required': ['id', 'customerId', 'items', 'total'],
}
config = step(
name='process-order',
triggers=[
queue('orders', input=order_schema),
],
)
async def handler(input, ctx):
ctx.logger.info('Processing order', {
'orderId': input['id'],
'itemCount': len(input['items']),
'total': input['total'],
})
Enqueuing messages
Send messages to queues from handlers:
import { step, http, queue } from 'motia'
export const config = step({
name: 'create-order',
triggers: [http('POST', '/orders')],
enqueues: ['orders'],
})
export const handler = async (input, ctx) => {
const order = input.request.body
// Enqueue order for processing
await ctx.enqueue({
topic: 'orders',
data: order,
})
return {
status: 202,
body: { message: 'Order queued for processing' },
}
}
from motia import step, http, queue
config = step(
name='create-order',
triggers=[http('POST', '/orders')],
enqueues=['orders'],
)
async def handler(input, ctx):
order = input['request']['body']
# Enqueue order for processing
await ctx.enqueue({
'topic': 'orders',
'data': order,
})
return {
'status': 202,
'body': {'message': 'Order queued for processing'},
}
Infrastructure configuration
Configure queue behavior and retry policies:
import { step, queue } from 'motia'
export const config = step({
name: 'critical-task',
triggers: [
queue('critical-tasks', {
infrastructure: {
queue: {
type: 'fifo',
maxRetries: 5,
visibilityTimeout: 300,
delaySeconds: 0,
},
handler: {
ram: 2048,
cpu: 1024,
timeout: 300,
},
},
}),
],
})
from motia import step, queue
config = step(
name='critical-task',
triggers=[
queue(
'critical-tasks',
infrastructure={
'queue': {
'type': 'fifo',
'maxRetries': 5,
'visibilityTimeout': 300,
'delaySeconds': 0,
},
'handler': {
'ram': 2048,
'cpu': 1024,
'timeout': 300,
},
},
),
],
)
Message group IDs
Use message group IDs for FIFO queues to ensure ordered processing:
export const handler = async (input, ctx) => {
const { customerId, order } = input.request.body
// Enqueue with message group ID
await ctx.enqueue({
topic: 'orders',
data: order,
messageGroupId: customerId,
})
return {
status: 202,
body: { queued: true },
}
}
async def handler(input, ctx):
customer_id = input['request']['body']['customerId']
order = input['request']['body']['order']
# Enqueue with message group ID
await ctx.enqueue({
'topic': 'orders',
'data': order,
'messageGroupId': customer_id,
})
return {
'status': 202,
'body': {'queued': True},
}
Conditional triggers
Use conditions to selectively process messages:
import { step, queue } from 'motia'
export const config = step({
name: 'process-priority-orders',
triggers: [
queue(
'orders',
undefined,
(input, ctx) => {
return input.priority === 'high'
},
),
],
})
export const handler = async (input, ctx) => {
ctx.logger.info('Processing high priority order', {
orderId: input.id,
})
}
from motia import step, queue
def priority_condition(input, ctx):
return input.get('priority') == 'high'
config = step(
name='process-priority-orders',
triggers=[
queue('orders', condition=priority_condition),
],
)
async def handler(input, ctx):
ctx.logger.info('Processing high priority order', {
'orderId': input['id'],
})
Error handling
Handle failures with automatic retries:
export const handler = async (input, ctx) => {
try {
await processTask(input)
} catch (error) {
ctx.logger.error('Task processing failed', {
error: error.message,
taskId: input.id,
})
// Error is automatically retried based on maxRetries config
throw error
}
}
async def handler(input, ctx):
try:
await process_task(input)
except Exception as error:
ctx.logger.error('Task processing failed', {
'error': str(error),
'taskId': input['id'],
})
# Error is automatically retried based on maxRetries config
raise
Configuration options
The queue topic name to subscribe to
Schema for validating incoming messages. Provides type safety and runtime validation
Queue and handler configuration options
infrastructure.queue.type
'fifo' | 'standard'
default:"standard"
Queue type:
fifo: First-in-first-out with guaranteed ordering
standard: Best-effort ordering with higher throughput
infrastructure.queue.maxRetries
Maximum number of retry attempts for failed messages
infrastructure.queue.visibilityTimeout
Visibility timeout in seconds. Time a message is hidden after being received
infrastructure.queue.delaySeconds
Delay in seconds before messages become available
infrastructure.handler.ram
Memory allocation in MB for the handler function
infrastructure.handler.cpu
CPU allocation in CPU units for the handler function
infrastructure.handler.timeout
Handler timeout in seconds
Optional function (input, ctx) => boolean to conditionally process messages
Use cases
Background job processing
Process long-running tasks asynchronously:
import { step, queue } from 'motia'
import { z } from 'zod'
const videoSchema = z.object({
videoId: z.string(),
format: z.enum(['mp4', 'webm', 'avi']),
quality: z.enum(['720p', '1080p', '4k']),
})
export const config = step({
name: 'transcode-video',
triggers: [
queue('video-transcode', {
input: videoSchema,
infrastructure: {
handler: {
ram: 4096,
timeout: 900,
},
},
}),
],
})
export const handler = async (input, ctx) => {
const { videoId, format, quality } = input
ctx.logger.info('Starting transcode', { videoId, format, quality })
await transcodeVideo(videoId, format, quality)
ctx.logger.info('Transcode complete', { videoId })
}
Event-driven workflow
Chain multiple steps with queues:
import { step, queue } from 'motia'
export const config = step({
name: 'notify-user',
triggers: [queue('order-completed')],
enqueues: ['send-email'],
})
export const handler = async (input, ctx) => {
const { orderId, customerId } = input
// Queue email notification
await ctx.enqueue({
topic: 'send-email',
data: {
to: customerId,
subject: 'Order complete',
template: 'order-confirmation',
orderId,
},
})
}
Batch processing
Process items in batches:
import { step, queue } from 'motia'
const batch: Item[] = []
const BATCH_SIZE = 100
export const config = step({
name: 'batch-processor',
triggers: [queue('items')],
})
export const handler = async (input, ctx) => {
batch.push(input)
if (batch.length >= BATCH_SIZE) {
await processBatch(batch)
batch.length = 0
}
}