Skip to main content
The QueueAdapter trait defines the interface for queue implementations in the iii framework. Adapters handle message enqueueing, topic subscriptions, and dead-letter queue management.

Location

use iii::modules::queue::QueueAdapter;
Defined in src/modules/queue/mod.rs

Trait Definition

#[async_trait::async_trait]
pub trait QueueAdapter: Send + Sync + 'static {
    async fn enqueue(
        &self,
        topic: &str,
        data: Value,
        traceparent: Option<String>,
        baggage: Option<String>,
    );
    
    async fn subscribe(
        &self,
        topic: &str,
        id: &str,
        function_id: &str,
        condition_function_id: Option<String>,
        queue_config: Option<SubscriberQueueConfig>,
    );
    
    async fn unsubscribe(&self, topic: &str, id: &str);
    
    async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>;
    
    async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>;
}

Required Methods

enqueue

async fn enqueue(
    &self,
    topic: &str,
    data: Value,
    traceparent: Option<String>,
    baggage: Option<String>,
)
Enqueues a message to a topic with optional OpenTelemetry tracing context.
topic
&str
required
The topic name to publish the message to
data
Value
required
The message payload as a JSON value
traceparent
Option<String>
W3C traceparent header for distributed tracing
baggage
Option<String>
W3C baggage header for trace context propagation

subscribe

async fn subscribe(
    &self,
    topic: &str,
    id: &str,
    function_id: &str,
    condition_function_id: Option<String>,
    queue_config: Option<SubscriberQueueConfig>,
)
Subscribes to a topic with a handler function and optional filtering.
topic
&str
required
The topic name to subscribe to
id
&str
required
Unique identifier for this subscription
function_id
&str
required
The ID of the function to invoke when messages arrive
condition_function_id
Option<String>
Optional filter function ID to conditionally process messages
queue_config
Option<SubscriberQueueConfig>
Optional queue configuration (retries, concurrency, timeouts, etc.)

unsubscribe

async fn unsubscribe(&self, topic: &str, id: &str)
Removes a subscription from a topic.
topic
&str
required
The topic name to unsubscribe from
id
&str
required
The subscription ID to remove

redrive_dlq

async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>
Redrives (retries) all messages from the dead-letter queue for a topic.
topic
&str
required
The topic whose dead-letter queue to redrive
return
anyhow::Result<u64>
The number of messages redriven, or an error

dlq_count

async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>
Returns the number of messages in the dead-letter queue for a topic.
topic
&str
required
The topic whose dead-letter queue to count
return
anyhow::Result<u64>
The message count, or an error

Implementation Example

use std::sync::Arc;
use async_trait::async_trait;
use redis::{Client, AsyncCommands, aio::ConnectionManager};
use serde_json::Value;
use tokio::sync::{Mutex, RwLock};
use iii::{
    engine::Engine,
    modules::queue::{QueueAdapter, SubscriberQueueConfig},
};

pub struct RedisAdapter {
    publisher: Arc<Mutex<ConnectionManager>>,
    subscriber: Arc<Client>,
    subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
    engine: Arc<Engine>,
}

impl RedisAdapter {
    pub async fn new(redis_url: String, engine: Arc<Engine>) -> anyhow::Result<Self> {
        let client = Client::open(redis_url.as_str())?;
        let manager = client.get_connection_manager().await?;
        
        Ok(Self {
            publisher: Arc::new(Mutex::new(manager)),
            subscriber: Arc::new(client),
            subscriptions: Arc::new(RwLock::new(HashMap::new())),
            engine,
        })
    }
}

#[async_trait]
impl QueueAdapter for RedisAdapter {
    async fn enqueue(
        &self,
        topic: &str,
        data: Value,
        traceparent: Option<String>,
        baggage: Option<String>,
    ) {
        let topic = topic.to_string();
        let publisher = Arc::clone(&self.publisher);
        
        tokio::spawn(async move {
            let mut conn = publisher.lock().await;
            let message = serde_json::json!({
                "data": data,
                "traceparent": traceparent,
                "baggage": baggage,
            });
            let _: Result<(), _> = conn.publish(topic, message.to_string()).await;
        });
    }

    async fn subscribe(
        &self,
        topic: &str,
        id: &str,
        function_id: &str,
        condition_function_id: Option<String>,
        queue_config: Option<SubscriberQueueConfig>,
    ) {
        // Implementation details...
    }

    async fn unsubscribe(&self, topic: &str, id: &str) {
        // Implementation details...
    }

    async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64> {
        // Implementation details...
        Ok(0)
    }

    async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64> {
        // Implementation details...
        Ok(0)
    }
}

SubscriberQueueConfig

pub struct SubscriberQueueConfig {
    pub queue_mode: Option<String>,
    pub max_retries: Option<u32>,
    pub concurrency: Option<u32>,
    pub visibility_timeout: Option<u64>,
    pub delay_seconds: Option<u64>,
    pub backoff_type: Option<String>,
    pub backoff_delay_ms: Option<u64>,
}
Configuration for queue subscriber behavior:
  • queue_mode - Queue type (e.g., “fifo”, “standard”)
  • max_retries - Maximum retry attempts before moving to DLQ
  • concurrency - Number of concurrent message processors
  • visibility_timeout - Time (ms) before message becomes visible again
  • delay_seconds - Initial delay before processing
  • backoff_type - Retry backoff strategy (e.g., “exponential”)
  • backoff_delay_ms - Base delay between retries

Registration Example

use iii::modules::queue::QueueCoreModule;

// Register a custom queue adapter
QueueCoreModule::add_adapter("my_queue", |engine, config| async move {
    let adapter = MyQueueAdapter::new(engine, config).await?;
    Ok(Arc::new(adapter) as Arc<dyn QueueAdapter>)
}).await?;

See Also

Build docs developers (and LLMs) love