Skip to main content
Queue triggers allow you to process messages from topics, enabling asynchronous workflows, event-driven architectures, and background job processing.

Basic usage

import { step, queue } from 'motia'

export const config = step({
  name: 'process-order',
  triggers: [queue('orders')],
})

export const handler = async (input, ctx) => {
  const order = input
  
  ctx.logger.info('Processing order', { orderId: order.id })
  
  // Process the order
  await processOrder(order)
}

Schema validation

Define input schemas to validate incoming messages:
import { step, queue } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  id: z.string(),
  customerId: z.string(),
  items: z.array(z.object({
    productId: z.string(),
    quantity: z.number(),
  })),
  total: z.number(),
})

export const config = step({
  name: 'process-order',
  triggers: [
    queue('orders', {
      input: orderSchema,
    }),
  ],
})

export const handler = async (input, ctx) => {
  // input is fully typed based on orderSchema
  const { id, customerId, items, total } = input
  
  ctx.logger.info('Processing order', {
    orderId: id,
    itemCount: items.length,
    total,
  })
}

Enqueuing messages

Send messages to queues from handlers:
import { step, http, queue } from 'motia'

export const config = step({
  name: 'create-order',
  triggers: [http('POST', '/orders')],
  enqueues: ['orders'],
})

export const handler = async (input, ctx) => {
  const order = input.request.body
  
  // Enqueue order for processing
  await ctx.enqueue({
    topic: 'orders',
    data: order,
  })
  
  return {
    status: 202,
    body: { message: 'Order queued for processing' },
  }
}

Infrastructure configuration

Configure queue behavior and retry policies:
import { step, queue } from 'motia'

export const config = step({
  name: 'critical-task',
  triggers: [
    queue('critical-tasks', {
      infrastructure: {
        queue: {
          type: 'fifo',
          maxRetries: 5,
          visibilityTimeout: 300,
          delaySeconds: 0,
        },
        handler: {
          ram: 2048,
          cpu: 1024,
          timeout: 300,
        },
      },
    }),
  ],
})

Message group IDs

Use message group IDs for FIFO queues to ensure ordered processing:
export const handler = async (input, ctx) => {
  const { customerId, order } = input.request.body
  
  // Enqueue with message group ID
  await ctx.enqueue({
    topic: 'orders',
    data: order,
    messageGroupId: customerId,
  })
  
  return {
    status: 202,
    body: { queued: true },
  }
}

Conditional triggers

Use conditions to selectively process messages:
import { step, queue } from 'motia'

export const config = step({
  name: 'process-priority-orders',
  triggers: [
    queue(
      'orders',
      undefined,
      (input, ctx) => {
        return input.priority === 'high'
      },
    ),
  ],
})

export const handler = async (input, ctx) => {
  ctx.logger.info('Processing high priority order', {
    orderId: input.id,
  })
}

Error handling

Handle failures with automatic retries:
export const handler = async (input, ctx) => {
  try {
    await processTask(input)
  } catch (error) {
    ctx.logger.error('Task processing failed', {
      error: error.message,
      taskId: input.id,
    })
    
    // Error is automatically retried based on maxRetries config
    throw error
  }
}

Configuration options

topic
string
required
The queue topic name to subscribe to
input
ZodSchema | JsonSchema
Schema for validating incoming messages. Provides type safety and runtime validation
infrastructure
InfrastructureConfig
Queue and handler configuration options
infrastructure.queue.type
'fifo' | 'standard'
default:"standard"
Queue type:
  • fifo: First-in-first-out with guaranteed ordering
  • standard: Best-effort ordering with higher throughput
infrastructure.queue.maxRetries
number
default:"3"
Maximum number of retry attempts for failed messages
infrastructure.queue.visibilityTimeout
number
default:"30"
Visibility timeout in seconds. Time a message is hidden after being received
infrastructure.queue.delaySeconds
number
default:"0"
Delay in seconds before messages become available
infrastructure.handler.ram
number
default:"512"
Memory allocation in MB for the handler function
infrastructure.handler.cpu
number
default:"256"
CPU allocation in CPU units for the handler function
infrastructure.handler.timeout
number
default:"30"
Handler timeout in seconds
condition
function
Optional function (input, ctx) => boolean to conditionally process messages

Use cases

Background job processing

Process long-running tasks asynchronously:
import { step, queue } from 'motia'
import { z } from 'zod'

const videoSchema = z.object({
  videoId: z.string(),
  format: z.enum(['mp4', 'webm', 'avi']),
  quality: z.enum(['720p', '1080p', '4k']),
})

export const config = step({
  name: 'transcode-video',
  triggers: [
    queue('video-transcode', {
      input: videoSchema,
      infrastructure: {
        handler: {
          ram: 4096,
          timeout: 900,
        },
      },
    }),
  ],
})

export const handler = async (input, ctx) => {
  const { videoId, format, quality } = input
  
  ctx.logger.info('Starting transcode', { videoId, format, quality })
  
  await transcodeVideo(videoId, format, quality)
  
  ctx.logger.info('Transcode complete', { videoId })
}

Event-driven workflow

Chain multiple steps with queues:
import { step, queue } from 'motia'

export const config = step({
  name: 'notify-user',
  triggers: [queue('order-completed')],
  enqueues: ['send-email'],
})

export const handler = async (input, ctx) => {
  const { orderId, customerId } = input
  
  // Queue email notification
  await ctx.enqueue({
    topic: 'send-email',
    data: {
      to: customerId,
      subject: 'Order complete',
      template: 'order-confirmation',
      orderId,
    },
  })
}

Batch processing

Process items in batches:
import { step, queue } from 'motia'

const batch: Item[] = []
const BATCH_SIZE = 100

export const config = step({
  name: 'batch-processor',
  triggers: [queue('items')],
})

export const handler = async (input, ctx) => {
  batch.push(input)
  
  if (batch.length >= BATCH_SIZE) {
    await processBatch(batch)
    batch.length = 0
  }
}

Build docs developers (and LLMs) love