Skip to main content
This example demonstrates how to build production-grade background workers that process tasks asynchronously with:
  • Queue-based task distribution
  • Multiple worker instances
  • State-driven workflows
  • Cron-triggered maintenance

Use cases

  • Image processing pipelines
  • Email batch sending
  • Data import/export
  • Report generation
  • Cleanup and maintenance tasks

Architecture

This example processes messages through a pipeline:
  1. HTTP endpoint: Receives messages and enqueues them
  2. Queue worker: Processes messages in the background
  3. Cron job: Handles periodic cleanup

Step 1: Create the message endpoint

Create steps/send-message.step.ts to receive and enqueue messages:
steps/send-message.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'SendMessage',
  description: 'Receives messages and enqueues for background processing',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/messages',
      bodySchema: z.object({
        text: z.string().min(1).max(500),
        priority: z.enum(['low', 'normal', 'high']).default('normal'),
        metadata: z.record(z.string()).optional(),
      }),
      responseSchema: {
        200: z.object({
          messageId: z.string(),
          status: z.string(),
        }),
        400: z.object({ error: z.string() }),
      },
    },
  ],
  enqueues: ['message.sent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { enqueue, logger, state }
) => {
  const { text, priority, metadata } = request.body
  const messageId = `msg-${Date.now()}-${Math.random().toString(36).substring(7)}`

  logger.info('Received message', { messageId, priority })

  // Store message metadata
  await state.set('messages', messageId, {
    text,
    priority,
    metadata,
    status: 'pending',
    createdAt: new Date().toISOString(),
  })

  // Enqueue for processing
  await enqueue({
    topic: 'message.sent',
    data: { messageId, text, priority },
  })

  return {
    status: 200,
    body: { messageId, status: 'enqueued' },
  }
}

Step 2: Create the background worker

Create steps/process-message.step.ts to process messages:
steps/process-message.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const inputSchema = z.object({
  messageId: z.string(),
  text: z.string(),
  priority: z.enum(['low', 'normal', 'high']),
})

export const config = {
  name: 'ProcessMessage',
  description: 'Background worker for message processing',
  triggers: [
    {
      type: 'queue',
      topic: 'message.sent',
      input: inputSchema,
    },
  ],
  enqueues: ['message.processed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state, enqueue }
) => {
  const { messageId, text, priority } = input

  logger.info('Processing message', { messageId, priority })

  try {
    // Simulate processing work (e.g., API calls, transformations)
    const processedText = await processText(text)
    const processingTime = await simulateWork(priority)

    // Update state
    await state.update('messages', messageId, {
      status: 'processed',
      processedAt: new Date().toISOString(),
      processedText,
      processingTime,
    })

    logger.info('Message processed successfully', {
      messageId,
      processingTime,
    })

    // Enqueue next step
    await enqueue({
      topic: 'message.processed',
      data: { messageId, processedText },
    })
  } catch (error) {
    logger.error('Message processing failed', {
      messageId,
      error: error.message,
    })

    // Update state with error
    await state.update('messages', messageId, {
      status: 'failed',
      error: error.message,
      failedAt: new Date().toISOString(),
    })

    throw error // Will be handled by queue retry mechanism
  }
}

// Simulate text processing
async function processText(text: string): Promise<string> {
  // Example: uppercase, word count, etc.
  return text.toUpperCase()
}

// Simulate processing time based on priority
async function simulateWork(priority: string): Promise<number> {
  const delay = priority === 'high' ? 100 : priority === 'normal' ? 500 : 1000
  await new Promise((resolve) => setTimeout(resolve, delay))
  return delay
}

Worker features

  • Error handling: Catches errors and updates state
  • Retry support: Throwing errors triggers queue retry mechanism
  • Priority processing: Adjusts work based on priority
  • State tracking: Updates message status throughout lifecycle

Step 3: Add periodic cleanup

Create steps/cleanup-messages.step.ts for maintenance:
steps/cleanup-messages.step.ts
import type { Handlers, StepConfig } from 'motia'

interface Message {
  messageId: string
  status: string
  createdAt: string
  processedAt?: string
}

export const config = {
  name: 'CleanupMessages',
  description: 'Clean up old processed messages',
  triggers: [
    {
      type: 'cron',
      expression: '0 0 2 * * * *', // Every day at 2 AM
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  _input,
  { logger, state }
) => {
  logger.info('Starting message cleanup')

  const messages = await state.list<Message>('messages')
  const cutoffDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // 30 days ago

  let deletedCount = 0

  for (const message of messages) {
    const messageDate = new Date(message.processedAt || message.createdAt)

    if (
      message.status === 'processed' &&
      messageDate < cutoffDate
    ) {
      await state.delete('messages', message.messageId)
      deletedCount++
    }
  }

  logger.info('Cleanup completed', {
    totalMessages: messages.length,
    deletedCount,
  })
}

Advanced: Multiple workers with state trigger

Create steps/notify-completion.step.ts to react to state changes:
steps/notify-completion.step.ts
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'NotifyCompletion',
  description: 'Send notification when message is processed',
  triggers: [
    {
      type: 'state',
      namespace: 'messages',
      filter: {
        status: 'processed',
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger }
) => {
  logger.info('Message completed', {
    messageId: input.key,
    data: input.value,
  })

  // Send notification (e.g., email, webhook, etc.)
  await sendNotification({
    type: 'message_completed',
    messageId: input.key,
    completedAt: input.value.processedAt,
  })
}

async function sendNotification(notification: any) {
  // Implementation for notification service
  console.log('Notification sent:', notification)
}

Testing the worker

Send a message

curl -X POST http://localhost:3000/messages \
  -H "Content-Type: application/json" \
  -d '{
    "text": "Process this message",
    "priority": "high",
    "metadata": {
      "source": "api",
      "userId": "user-123"
    }
  }'
Response:
{
  "messageId": "msg-1234567890-abc123",
  "status": "enqueued"
}

Monitor processing

Check the iii Console to see:
  1. Message received by HTTP endpoint
  2. Message enqueued to message.sent topic
  3. Background worker processing the message
  4. State updated with processing results
  5. Notification sent via state trigger

Configuration: Queue strategies

Configure queue behavior in iii-config.yaml:
iii-config.yaml
queues:
  message.sent:
    max_retries: 3
    retry_delay: 5000
    concurrency: 5
    dead_letter_queue: message.failed

Queue options

  • max_retries: Number of retry attempts for failed messages
  • retry_delay: Delay between retries in milliseconds
  • concurrency: Number of concurrent workers
  • dead_letter_queue: Queue for permanently failed messages

Scaling workers

The iii engine automatically distributes work across multiple instances. To scale:
# Start multiple instances
iii -c iii-config.yaml --instances 3
Each instance will process messages from the same queue, providing:
  • Load distribution
  • High availability
  • Fault tolerance

What you learned

Queue triggers

Process messages asynchronously with queues

Error handling

Handle failures and implement retry logic

State triggers

React to state changes automatically

Cron jobs

Schedule periodic maintenance tasks

Next steps

ChessArena example

See a full production app with background workers

Workflows guide

Build complex multi-step workflows

Build docs developers (and LLMs) love