Skip to main content

Overview

Background jobs let you process work asynchronously without blocking API responses. Motia provides a robust queue system with automatic retries, dead letter queues, and distributed processing.

Basic queue pattern

The most common pattern is to enqueue work from an API endpoint and process it in a separate step.
1

Create an API endpoint that enqueues work

steps/api.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'CreateOrderAPI',
  flows: ['orders'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/orders',
      bodySchema: z.object({
        email: z.string().email(),
        quantity: z.number(),
        petId: z.string(),
      }),
    },
  ],
  enqueues: ['process-food-order'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { logger, enqueue }
) => {
  logger.info('Order received', { body: request.body })

  // Enqueue the order for background processing
  await enqueue({
    topic: 'process-food-order',
    data: {
      ...request.body,
      orderId: `order-${Date.now()}`,
    },
  })

  return {
    status: 202, // Accepted for processing
    body: { 
      message: 'Order received and is being processed',
      status: 'pending',
    },
  }
}
2

Create a queue consumer step

steps/process-order.step.ts
import { queue, step } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  email: z.string(),
  quantity: z.number(),
  petId: z.string(),
  orderId: z.string(),
})

export const stepConfig = {
  name: 'ProcessFoodOrder',
  description: 'Process food orders in the background',
  flows: ['orders'],
  triggers: [
    queue('process-food-order', { input: orderSchema }),
  ],
  enqueues: ['notification'],
}

export const { config, handler } = step(stepConfig, async (_input, ctx) => {
  const data = ctx.getData()

  ctx.logger.info('Processing order', { orderId: data.orderId })

  // Simulate order processing
  const order = {
    id: data.orderId,
    email: data.email,
    quantity: data.quantity,
    petId: data.petId,
    status: 'placed',
    shipDate: new Date().toISOString(),
  }

  // Persist to state
  await ctx.state.set('orders', order.id, order)

  // Notify customer
  await ctx.enqueue({
    topic: 'notification',
    data: {
      email: data.email,
      templateId: 'order-confirmation',
      templateData: order,
    },
  })

  ctx.logger.info('Order processed successfully', { orderId: order.id })
})
3

Test the flow

curl -X POST http://localhost:3000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "email": "[email protected]",
    "quantity": 2,
    "petId": "pet-123"
  }'
The API returns immediately with a 202 status, while the order is processed asynchronously in the background.

Scheduled jobs with Cron

Run jobs on a schedule using cron expressions:
steps/periodic-cleanup.step.ts
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'PeriodicCleanup',
  description: 'Clean up old orders every day at midnight',
  triggers: [
    {
      type: 'cron',
      expression: '0 0 * * *', // Daily at midnight
    },
  ],
  flows: ['maintenance'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, { logger, state }) => {
  logger.info('Running periodic cleanup')

  const thirtyDaysAgo = Date.now() - (30 * 24 * 60 * 60 * 1000)
  const oldOrders = await state.list('orders')

  let deletedCount = 0

  for (const order of oldOrders) {
    const orderDate = new Date(order.createdAt).getTime()
    
    if (orderDate < thirtyDaysAgo) {
      await state.delete('orders', order.id)
      deletedCount++
    }
  }

  logger.info('Cleanup complete', { deletedCount })
}
Cron expressions use standard format: minute hour day month weekday
  • * * * * * - Every minute
  • 0 * * * * - Every hour
  • 0 0 * * * - Daily at midnight
  • 0 9 * * 1 - Every Monday at 9 AM

Processing batch jobs

Process multiple items from a queue in a scheduled batch:
steps/batch-processor.step.ts
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'BatchOrderProcessor',
  description: 'Process pending orders in batches',
  triggers: [
    {
      type: 'cron',
      expression: '*/5 * * * *', // Every 5 minutes
    },
  ],
  enqueues: ['order.processed'],
  flows: ['batch-processing'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, ctx) => {
  ctx.logger.info('Starting batch processing')

  // Get all pending orders
  const pendingOrders = await ctx.state.list<{
    id: string
    amount: number
    status: string
  }>('pending-orders')

  ctx.logger.info('Found pending orders', { count: pendingOrders.length })

  // Process each order
  for (const order of pendingOrders) {
    try {
      // Enqueue individual processing
      await ctx.enqueue({
        topic: 'order.processed',
        data: {
          orderId: order.id,
          amount: order.amount,
          source: 'batch',
        },
      })

      // Remove from pending queue
      await ctx.state.delete('pending-orders', order.id)
    } catch (error) {
      ctx.logger.error('Failed to process order', {
        orderId: order.id,
        error,
      })
    }
  }

  ctx.logger.info('Batch processing complete', {
    processed: pendingOrders.length,
  })
}

Retry logic

Motia automatically retries failed queue processing. You can also implement custom retry logic:
export const { config, handler } = step(stepConfig, async (_input, ctx) => {
  const data = ctx.getData()
  const maxRetries = 3

  // Get retry count from state
  const retryKey = `retry:${data.orderId}`
  const retryCount = await ctx.state.get<number>('retries', retryKey) || 0

  try {
    await processOrder(data)
    
    // Success - clean up retry state
    await ctx.state.delete('retries', retryKey)
    
  } catch (error) {
    ctx.logger.error('Order processing failed', {
      orderId: data.orderId,
      retryCount,
      error,
    })

    if (retryCount < maxRetries) {
      // Increment retry count
      await ctx.state.set('retries', retryKey, retryCount + 1)
      
      // Re-enqueue with exponential backoff
      const delayMs = Math.pow(2, retryCount) * 1000
      
      setTimeout(async () => {
        await ctx.enqueue({
          topic: 'process-food-order',
          data,
        })
      }, delayMs)
      
    } else {
      // Max retries reached - send to dead letter queue
      await ctx.enqueue({
        topic: 'failed-orders',
        data: {
          ...data,
          error: error.message,
          failedAt: new Date().toISOString(),
        },
      })
    }
  }
})

Dead letter queue

Handle permanently failed jobs with a dead letter queue:
steps/handle-failed-orders.step.ts
import { queue, step } from 'motia'
import { z } from 'zod'

const failedOrderSchema = z.object({
  orderId: z.string(),
  email: z.string(),
  error: z.string(),
  failedAt: z.string(),
})

export const stepConfig = {
  name: 'HandleFailedOrders',
  description: 'Process orders that failed after max retries',
  flows: ['error-handling'],
  triggers: [
    queue('failed-orders', { input: failedOrderSchema }),
  ],
  enqueues: ['alert-admin'],
}

export const { config, handler } = step(stepConfig, async (_input, ctx) => {
  const data = ctx.getData()

  ctx.logger.error('Order permanently failed', {
    orderId: data.orderId,
    error: data.error,
  })

  // Store in failed orders for manual review
  await ctx.state.set('failed-orders', data.orderId, {
    ...data,
    needsManualReview: true,
  })

  // Alert administrators
  await ctx.enqueue({
    topic: 'alert-admin',
    data: {
      type: 'order-failure',
      orderId: data.orderId,
      customerEmail: data.email,
      error: data.error,
    },
  })
})

Queue priorities

Process high-priority items first using conditional triggers:
import type { TriggerCondition } from 'motia'

const isHighPriority: TriggerCondition<{ 
  amount: number 
}> = (input) => {
  if (!input) return false
  return input.amount > 1000
}

export const config = {
  name: 'ProcessHighPriorityOrders',
  triggers: [
    {
      type: 'queue',
      topic: 'order.created',
      input: orderSchema,
      condition: isHighPriority,
    },
  ],
} as const satisfies StepConfig

Best practices

Keep jobs idempotent

Design jobs to produce the same result if run multiple times. Use unique IDs to track processed items.

Use appropriate timeouts

Set realistic timeouts for long-running jobs. Break very long jobs into smaller chunks.

Monitor queue depth

Track how many items are waiting in queues. Scale up processing if queues grow too large.

Implement dead letter queues

Always have a fallback for permanently failed jobs. Alert humans when manual intervention is needed.

Next steps

Real-time Streaming

Add WebSocket support for real-time updates

AI Integration

Build AI-powered workflows with LLMs

Build docs developers (and LLMs) love