Skip to main content

Overview

This tutorial demonstrates how to build a message queue system using Motia’s pub/sub pattern. You’ll create an order processing system that accepts orders via HTTP, processes them asynchronously, and sends notifications. What you’ll learn:
  • Pub/sub messaging patterns
  • Queue-triggered Steps
  • Chaining Steps with enqueue
  • Event-driven architecture
  • Error handling in async workflows
  • Processing pipelines

Prerequisites

Before starting, make sure you have:
  • Node.js version 19 or higher
  • Completed the Hello World tutorial
  • Understanding of asynchronous processing

Use Case: Order Processing System

We’ll build a system that:
  1. Accepts orders via HTTP API
  2. Validates and processes orders in the background
  3. Updates inventory
  4. Sends confirmation notifications
  5. Handles errors gracefully

Project Setup

1

Create project

mkdir order-processing
cd order-processing
npm init -y
2

Install dependencies

npm install motia zod
npm install -D typescript @types/node
Update package.json:
{
  "type": "module",
  "scripts": {
    "dev": "iii",
    "build": "motia build"
  }
}
3

Create structure

mkdir -p steps/orders

Building the System

Step 1: Define Data Types

Create steps/orders/types.ts:
steps/orders/types.ts
import { z } from 'zod'

export const orderItemSchema = z.object({
  productId: z.string(),
  quantity: z.number().positive(),
  price: z.number().positive(),
})

export const orderSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  items: z.array(orderItemSchema),
  total: z.number(),
  status: z.enum(['pending', 'processing', 'completed', 'failed']),
  createdAt: z.string(),
  processedAt: z.string().optional(),
})

export const processOrderEventSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  items: z.array(orderItemSchema),
  total: z.number(),
  timestamp: z.string(),
})

export const notificationEventSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  status: z.string(),
  message: z.string(),
})

export type Order = z.infer<typeof orderSchema>
export type OrderItem = z.infer<typeof orderItemSchema>
export type ProcessOrderEvent = z.infer<typeof processOrderEventSchema>
export type NotificationEvent = z.infer<typeof notificationEventSchema>

Step 2: Create Order API

Create steps/orders/create-order.step.ts:
steps/orders/create-order.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'
import { orderItemSchema, orderSchema } from './types'

export const config = {
  name: 'CreateOrder',
  description: 'Accepts new orders and enqueues them for processing',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/orders',
      bodySchema: z.object({
        customerId: z.string(),
        items: z.array(orderItemSchema).min(1),
      }),
      responseSchema: {
        200: orderSchema,
        400: z.object({ error: z.string() }),
      },
    },
  ],
  enqueues: ['process-order'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { enqueue, logger, state }
) => {
  const { customerId, items } = request.body || {}

  logger.info('Received new order', { customerId, itemCount: items?.length })

  // Calculate total
  const total = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

  // Generate order ID
  const orderId = `order-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`
  const timestamp = new Date().toISOString()

  // Create order record
  const order = {
    orderId,
    customerId,
    items,
    total,
    status: 'pending' as const,
    createdAt: timestamp,
  }

  // Store order in state
  await state.set('orders', orderId, order)

  // Enqueue for background processing
  await enqueue({
    topic: 'process-order',
    data: {
      orderId,
      customerId,
      items,
      total,
      timestamp,
    },
  })

  logger.info('Order created and enqueued', { orderId })

  return {
    status: 200,
    body: order,
  }
}
Key concepts:
  • Immediate response: API returns immediately while processing happens in background
  • State storage: Order is persisted before enqueueing
  • Enqueue: Publishes event to process-order topic

Step 3: Process Orders

Create steps/orders/process-order.step.ts:
steps/orders/process-order.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { processOrderEventSchema } from './types'

export const config = {
  name: 'ProcessOrder',
  description: 'Processes orders in the background',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'queue',
      topic: 'process-order',
      input: processOrderEventSchema,
    },
  ],
  enqueues: ['update-inventory', 'send-notification'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { enqueue, logger, state }
) => {
  const { orderId, customerId, items, total } = input

  logger.info('Processing order', { orderId, customerId })

  try {
    // Simulate order validation
    await new Promise((resolve) => setTimeout(resolve, 100))

    // Update order status to processing
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'processing' },
    ])

    // Enqueue inventory update
    await enqueue({
      topic: 'update-inventory',
      data: {
        orderId,
        items,
        timestamp: new Date().toISOString(),
      },
    })

    // Update order status to completed
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'completed' },
      { type: 'set', path: 'processedAt', value: new Date().toISOString() },
    ])

    // Enqueue notification
    await enqueue({
      topic: 'send-notification',
      data: {
        orderId,
        customerId,
        status: 'completed',
        message: `Your order ${orderId} has been processed successfully. Total: $${total}`,
      },
    })

    logger.info('Order processed successfully', { orderId })
  } catch (error) {
    logger.error('Order processing failed', { orderId, error })

    // Update order status to failed
    await state.update('orders', orderId, [
      { type: 'set', path: 'status', value: 'failed' },
    ])

    // Send failure notification
    await enqueue({
      topic: 'send-notification',
      data: {
        orderId,
        customerId,
        status: 'failed',
        message: `Order ${orderId} processing failed. Please contact support.`,
      },
    })
  }
}
Key concepts:
  • Chained events: Enqueues multiple follow-up tasks
  • Error handling: Catches failures and triggers error workflow
  • State updates: Tracks order progress
  • Fan-out pattern: Single event triggers multiple downstream events

Step 4: Update Inventory

Create steps/orders/update-inventory.step.ts:
steps/orders/update-inventory.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'
import { orderItemSchema } from './types'

const inventoryEventSchema = z.object({
  orderId: z.string(),
  items: z.array(orderItemSchema),
  timestamp: z.string(),
})

export const config = {
  name: 'UpdateInventory',
  description: 'Updates inventory based on order items',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'queue',
      topic: 'update-inventory',
      input: inventoryEventSchema,
    },
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state }
) => {
  const { orderId, items } = input

  logger.info('Updating inventory', { orderId, itemCount: items.length })

  for (const item of items) {
    // Get current inventory
    const inventory = await state.get('inventory', item.productId)
    const currentStock = inventory?.stock || 0

    // Decrement stock
    const newStock = Math.max(0, currentStock - item.quantity)

    await state.set('inventory', item.productId, {
      productId: item.productId,
      stock: newStock,
      lastUpdated: new Date().toISOString(),
    })

    logger.info('Inventory updated', {
      productId: item.productId,
      previousStock: currentStock,
      newStock,
      quantity: item.quantity,
    })
  }

  logger.info('Inventory update completed', { orderId })
}

Step 5: Send Notifications

Create steps/orders/send-notification.step.ts:
steps/orders/send-notification.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { notificationEventSchema } from './types'

export const config = {
  name: 'SendNotification',
  description: 'Sends order notifications to customers',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'queue',
      topic: 'send-notification',
      input: notificationEventSchema,
    },
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state }
) => {
  const { orderId, customerId, status, message } = input

  logger.info('Sending notification', { orderId, customerId, status })

  // In production, integrate with email/SMS service
  // For demo, just log and store in state

  const notification = {
    id: `notif-${Date.now()}`,
    orderId,
    customerId,
    status,
    message,
    sentAt: new Date().toISOString(),
  }

  await state.set('notifications', notification.id, notification)

  logger.info('Notification sent', { 
    notificationId: notification.id, 
    orderId, 
    customerId 
  })

  // Simulate email/SMS sending
  console.log(`\n📧 NOTIFICATION to customer ${customerId}:`, message, '\n')
}

Step 6: Get Order Status

Create steps/orders/get-order.step.ts:
steps/orders/get-order.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'
import { orderSchema } from './types'

export const config = {
  name: 'GetOrder',
  description: 'Retrieves order status',
  flows: ['order-processing'],
  triggers: [
    {
      type: 'http',
      method: 'GET',
      path: '/orders/:orderId',
      responseSchema: {
        200: orderSchema,
        404: z.object({ error: z.string() }),
      },
    },
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, state }
) => {
  const { orderId } = request.pathParams || {}

  logger.info('Fetching order', { orderId })

  const order = await state.get('orders', orderId)

  if (!order) {
    return {
      status: 404,
      body: { error: 'Order not found' },
    }
  }

  return {
    status: 200,
    body: order,
  }
}

Running the Application

1

Start the server

npm run dev
2

Create an order

curl -X POST http://localhost:3000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "customer-123",
    "items": [
      {"productId": "prod-1", "quantity": 2, "price": 19.99},
      {"productId": "prod-2", "quantity": 1, "price": 49.99}
    ]
  }'
Response:
{
  "orderId": "order-1709568000000-abc123",
  "customerId": "customer-123",
  "items": [...],
  "total": 89.97,
  "status": "pending",
  "createdAt": "2026-03-04T10:00:00.000Z"
}
3

Check order status

Watch the logs to see the processing pipeline:
[CreateOrder] Order created and enqueued
[ProcessOrder] Processing order
[UpdateInventory] Updating inventory
[SendNotification] Sending notification
📧 NOTIFICATION: Your order has been processed successfully
Query the order:
curl http://localhost:3000/orders/order-1709568000000-abc123

Flow Diagram

HTTP POST /orders

  CreateOrder (API)

  [enqueue: process-order]

  ProcessOrder (Queue)

       ├─→ [enqueue: update-inventory] → UpdateInventory
       └─→ [enqueue: send-notification] → SendNotification

Advanced Patterns

Parallel Processing

Modify process-order.step.ts to run inventory and notification in parallel:
// Enqueue both in parallel
await Promise.all([
  enqueue({ topic: 'update-inventory', data: inventoryData }),
  enqueue({ topic: 'send-notification', data: notificationData }),
])

Retry Logic

Add retry configuration to queue triggers:
triggers: [
  {
    type: 'queue',
    topic: 'process-order',
    input: processOrderEventSchema,
    retry: {
      maxAttempts: 3,
      backoff: 'exponential',
    },
  },
]

Dead Letter Queue

Handle permanently failed events:
triggers: [
  {
    type: 'queue',
    topic: 'process-order',
    input: processOrderEventSchema,
    deadLetterQueue: 'failed-orders',
  },
]

Next Steps

Scheduled Tasks

Add periodic jobs to your system

Queue Reference

Learn advanced queue features

Background Jobs Guide

Best practices for background processing

Workflows Guide

Advanced workflow patterns

Build docs developers (and LLMs) love