The Queue module provides asynchronous message processing with a publish/subscribe pattern. Functions can subscribe to topics and process messages asynchronously.
Configuration
Configure the Queue module in config.yaml:
modules:
- class: modules::queue::QueueModule
config:
adapter:
class: modules::queue::RedisAdapter
config:
redis_url: redis://localhost:6379
Available Adapters
Redis (Recommended)
Builtin (Development)
RabbitMQ
Production-ready distributed queue using Redis:adapter:
class: modules::queue::RedisAdapter
config:
redis_url: redis://localhost:6379
In-memory queue for development:adapter:
class: modules::queue::BuiltinQueueAdapter
Enterprise message broker:adapter:
class: modules::queue::RabbitMQAdapter
config:
amqp_url: amqp://localhost:5672
Publishing Messages
Use the queue.enqueue function to publish messages:
// Publish a message to a topic
await client.call('queue.enqueue', {
topic: 'user.created',
data: {
userId: 123,
email: 'user@example.com',
},
});
Subscribing to Topics
Define queue triggers to subscribe to topics:
export default iii({
triggers: {
'on-user-created': {
type: 'queue',
config: {
topic: 'user.created',
},
},
'on-order-placed': {
type: 'queue',
config: {
topic: 'order.placed',
},
},
},
});
export async function onUserCreated(data: any) {
console.log('New user:', data.userId);
// Send welcome email
await sendEmail(data.email, 'Welcome!');
}
export async function onOrderPlaced(data: any) {
console.log('New order:', data.orderId);
// Process payment
await processPayment(data);
}
Messages published to the queue include:
{
topic: string; // Topic name
data: any; // Message payload
traceparent?: string; // Distributed tracing header
baggage?: string; // Additional trace context
}
Queue Configuration Options
Advanced queue configuration per subscriber:
export default iii({
triggers: {
'process-order': {
type: 'queue',
config: {
topic: 'orders',
metadata: {
infrastructure: {
queue: {
max_retries: 3,
retry_delay_ms: 1000,
dead_letter_topic: 'orders.failed',
},
},
},
},
},
},
});
Maximum retry attempts for failed messages
Delay between retries in milliseconds
Topic for messages that exceed max retries
Dead Letter Queue
Messages that fail after max retries are sent to the dead letter queue:
export default iii({
triggers: {
'handle-failed-orders': {
type: 'queue',
config: {
topic: 'orders.failed',
},
},
},
});
export async function handleFailedOrders(data: any) {
// Log failed order for manual review
await logError('Order processing failed', data);
await notifyAdmin(data);
}
DLQ Management Functions
The Queue module provides functions to manage the dead letter queue:
// Get DLQ message count
const count = await client.call('queue.dlq_count', {
topic: 'orders',
});
// Redrive DLQ messages (retry all failed messages)
const redriven = await client.call('queue.redrive_dlq', {
topic: 'orders',
});
console.log(`Redriven ${redriven} messages`);
Conditional Processing
Use conditions to filter messages before processing:
export default iii({
triggers: {
'high-priority-orders': {
type: 'queue',
config: {
topic: 'orders',
},
condition: async (data) => {
return data.priority === 'high';
},
},
},
});
Distributed Tracing
The Queue module automatically propagates OpenTelemetry trace context:
// Publisher automatically adds trace context
await client.call('queue.enqueue', {
topic: 'notifications',
data: { message: 'Hello' },
});
// traceparent and baggage are added automatically
Subscribers receive the trace context and continue the distributed trace:
export async function handleNotification(data: any) {
// This function's span is linked to the publisher's trace
console.log('Processing notification:', data);
}
Example: Email Queue
export default iii({
triggers: {
'send-email': {
type: 'queue',
config: {
topic: 'emails.send',
metadata: {
infrastructure: {
queue: {
max_retries: 5,
retry_delay_ms: 2000,
dead_letter_topic: 'emails.failed',
},
},
},
},
},
'log-failed-emails': {
type: 'queue',
config: { topic: 'emails.failed' },
},
},
});
export async function sendEmail(data: any) {
const { to, subject, body } = data;
try {
await emailService.send({ to, subject, body });
console.log(`Email sent to ${to}`);
} catch (error) {
console.error('Email failed:', error);
throw error; // Will retry up to max_retries
}
}
export async function logFailedEmails(data: any) {
// Log to external service for manual retry
await errorTracker.log('Email delivery failed', data);
}
Publishing from Functions
export async function createUser(input: any) {
// Create user in database
const user = await db.users.create(input);
// Publish event for async processing
await client.call('queue.enqueue', {
topic: 'user.created',
data: { userId: user.id, email: user.email },
});
return user;
}
Topic Naming Conventions
Recommended topic naming patterns:
- Entity.Action:
user.created, order.placed
- Namespace.Entity.Action:
app.user.created
- Hierarchy:
notifications.email.send
- Use Redis adapter for production workloads
- Configure appropriate
max_retries based on your use case
- Monitor DLQ for failing messages
- Use distributed tracing to debug message flows
Source Code Reference
- Module:
src/modules/queue/queue.rs:34
- Enqueue function:
src/modules/queue/queue.rs:49
- Trigger registration:
src/modules/queue/queue.rs:81
- Queue adapter trait:
src/modules/queue/mod.rs:20