Skip to main content
The Queue module provides asynchronous message processing with a publish/subscribe pattern. Functions can subscribe to topics and process messages asynchronously.

Configuration

Configure the Queue module in config.yaml:
config.yaml
modules:
  - class: modules::queue::QueueModule
    config:
      adapter:
        class: modules::queue::RedisAdapter
        config:
          redis_url: redis://localhost:6379

Available Adapters

Publishing Messages

Use the queue.enqueue function to publish messages:
// Publish a message to a topic
await client.call('queue.enqueue', {
  topic: 'user.created',
  data: {
    userId: 123,
    email: 'user@example.com',
  },
});

Subscribing to Topics

Define queue triggers to subscribe to topics:
index.ts
export default iii({
  triggers: {
    'on-user-created': {
      type: 'queue',
      config: {
        topic: 'user.created',
      },
    },
    'on-order-placed': {
      type: 'queue',
      config: {
        topic: 'order.placed',
      },
    },
  },
});

export async function onUserCreated(data: any) {
  console.log('New user:', data.userId);
  // Send welcome email
  await sendEmail(data.email, 'Welcome!');
}

export async function onOrderPlaced(data: any) {
  console.log('New order:', data.orderId);
  // Process payment
  await processPayment(data);
}

Message Format

Messages published to the queue include:
{
  topic: string;          // Topic name
  data: any;              // Message payload
  traceparent?: string;   // Distributed tracing header
  baggage?: string;       // Additional trace context
}

Queue Configuration Options

Advanced queue configuration per subscriber:
export default iii({
  triggers: {
    'process-order': {
      type: 'queue',
      config: {
        topic: 'orders',
        metadata: {
          infrastructure: {
            queue: {
              max_retries: 3,
              retry_delay_ms: 1000,
              dead_letter_topic: 'orders.failed',
            },
          },
        },
      },
    },
  },
});
max_retries
number
default:"3"
Maximum retry attempts for failed messages
retry_delay_ms
number
default:"1000"
Delay between retries in milliseconds
dead_letter_topic
string
Topic for messages that exceed max retries

Dead Letter Queue

Messages that fail after max retries are sent to the dead letter queue:
export default iii({
  triggers: {
    'handle-failed-orders': {
      type: 'queue',
      config: {
        topic: 'orders.failed',
      },
    },
  },
});

export async function handleFailedOrders(data: any) {
  // Log failed order for manual review
  await logError('Order processing failed', data);
  await notifyAdmin(data);
}

DLQ Management Functions

The Queue module provides functions to manage the dead letter queue:
// Get DLQ message count
const count = await client.call('queue.dlq_count', {
  topic: 'orders',
});

// Redrive DLQ messages (retry all failed messages)
const redriven = await client.call('queue.redrive_dlq', {
  topic: 'orders',
});
console.log(`Redriven ${redriven} messages`);

Conditional Processing

Use conditions to filter messages before processing:
export default iii({
  triggers: {
    'high-priority-orders': {
      type: 'queue',
      config: {
        topic: 'orders',
      },
      condition: async (data) => {
        return data.priority === 'high';
      },
    },
  },
});

Distributed Tracing

The Queue module automatically propagates OpenTelemetry trace context:
// Publisher automatically adds trace context
await client.call('queue.enqueue', {
  topic: 'notifications',
  data: { message: 'Hello' },
});
// traceparent and baggage are added automatically
Subscribers receive the trace context and continue the distributed trace:
export async function handleNotification(data: any) {
  // This function's span is linked to the publisher's trace
  console.log('Processing notification:', data);
}

Example: Email Queue

index.ts
export default iii({
  triggers: {
    'send-email': {
      type: 'queue',
      config: {
        topic: 'emails.send',
        metadata: {
          infrastructure: {
            queue: {
              max_retries: 5,
              retry_delay_ms: 2000,
              dead_letter_topic: 'emails.failed',
            },
          },
        },
      },
    },
    'log-failed-emails': {
      type: 'queue',
      config: { topic: 'emails.failed' },
    },
  },
});

export async function sendEmail(data: any) {
  const { to, subject, body } = data;
  
  try {
    await emailService.send({ to, subject, body });
    console.log(`Email sent to ${to}`);
  } catch (error) {
    console.error('Email failed:', error);
    throw error; // Will retry up to max_retries
  }
}

export async function logFailedEmails(data: any) {
  // Log to external service for manual retry
  await errorTracker.log('Email delivery failed', data);
}

Publishing from Functions

export async function createUser(input: any) {
  // Create user in database
  const user = await db.users.create(input);
  
  // Publish event for async processing
  await client.call('queue.enqueue', {
    topic: 'user.created',
    data: { userId: user.id, email: user.email },
  });
  
  return user;
}

Topic Naming Conventions

Recommended topic naming patterns:
  • Entity.Action: user.created, order.placed
  • Namespace.Entity.Action: app.user.created
  • Hierarchy: notifications.email.send

Performance Considerations

  • Use Redis adapter for production workloads
  • Configure appropriate max_retries based on your use case
  • Monitor DLQ for failing messages
  • Use distributed tracing to debug message flows

Source Code Reference

  • Module: src/modules/queue/queue.rs:34
  • Enqueue function: src/modules/queue/queue.rs:49
  • Trigger registration: src/modules/queue/queue.rs:81
  • Queue adapter trait: src/modules/queue/mod.rs:20

Build docs developers (and LLMs) love