The QueueAdapter trait defines the interface for queue implementations in the iii framework. Adapters handle message enqueueing, topic subscriptions, and dead-letter queue management.
Location
use iii::modules::queue::QueueAdapter;
Defined in src/modules/queue/mod.rs
Trait Definition
#[async_trait::async_trait]
pub trait QueueAdapter: Send + Sync + 'static {
async fn enqueue(
&self,
topic: &str,
data: Value,
traceparent: Option<String>,
baggage: Option<String>,
);
async fn subscribe(
&self,
topic: &str,
id: &str,
function_id: &str,
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
);
async fn unsubscribe(&self, topic: &str, id: &str);
async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>;
async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>;
}
Required Methods
enqueue
async fn enqueue(
&self,
topic: &str,
data: Value,
traceparent: Option<String>,
baggage: Option<String>,
)
Enqueues a message to a topic with optional OpenTelemetry tracing context.
The topic name to publish the message to
The message payload as a JSON value
W3C traceparent header for distributed tracing
W3C baggage header for trace context propagation
subscribe
async fn subscribe(
&self,
topic: &str,
id: &str,
function_id: &str,
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
)
Subscribes to a topic with a handler function and optional filtering.
The topic name to subscribe to
Unique identifier for this subscription
The ID of the function to invoke when messages arrive
Optional filter function ID to conditionally process messages
queue_config
Option<SubscriberQueueConfig>
Optional queue configuration (retries, concurrency, timeouts, etc.)
unsubscribe
async fn unsubscribe(&self, topic: &str, id: &str)
Removes a subscription from a topic.
The topic name to unsubscribe from
The subscription ID to remove
redrive_dlq
async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>
Redrives (retries) all messages from the dead-letter queue for a topic.
The topic whose dead-letter queue to redrive
The number of messages redriven, or an error
dlq_count
async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>
Returns the number of messages in the dead-letter queue for a topic.
The topic whose dead-letter queue to count
The message count, or an error
Implementation Example
use std::sync::Arc;
use async_trait::async_trait;
use redis::{Client, AsyncCommands, aio::ConnectionManager};
use serde_json::Value;
use tokio::sync::{Mutex, RwLock};
use iii::{
engine::Engine,
modules::queue::{QueueAdapter, SubscriberQueueConfig},
};
pub struct RedisAdapter {
publisher: Arc<Mutex<ConnectionManager>>,
subscriber: Arc<Client>,
subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
engine: Arc<Engine>,
}
impl RedisAdapter {
pub async fn new(redis_url: String, engine: Arc<Engine>) -> anyhow::Result<Self> {
let client = Client::open(redis_url.as_str())?;
let manager = client.get_connection_manager().await?;
Ok(Self {
publisher: Arc::new(Mutex::new(manager)),
subscriber: Arc::new(client),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
engine,
})
}
}
#[async_trait]
impl QueueAdapter for RedisAdapter {
async fn enqueue(
&self,
topic: &str,
data: Value,
traceparent: Option<String>,
baggage: Option<String>,
) {
let topic = topic.to_string();
let publisher = Arc::clone(&self.publisher);
tokio::spawn(async move {
let mut conn = publisher.lock().await;
let message = serde_json::json!({
"data": data,
"traceparent": traceparent,
"baggage": baggage,
});
let _: Result<(), _> = conn.publish(topic, message.to_string()).await;
});
}
async fn subscribe(
&self,
topic: &str,
id: &str,
function_id: &str,
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
) {
// Implementation details...
}
async fn unsubscribe(&self, topic: &str, id: &str) {
// Implementation details...
}
async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64> {
// Implementation details...
Ok(0)
}
async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64> {
// Implementation details...
Ok(0)
}
}
SubscriberQueueConfig
pub struct SubscriberQueueConfig {
pub queue_mode: Option<String>,
pub max_retries: Option<u32>,
pub concurrency: Option<u32>,
pub visibility_timeout: Option<u64>,
pub delay_seconds: Option<u64>,
pub backoff_type: Option<String>,
pub backoff_delay_ms: Option<u64>,
}
Configuration for queue subscriber behavior:
queue_mode - Queue type (e.g., “fifo”, “standard”)
max_retries - Maximum retry attempts before moving to DLQ
concurrency - Number of concurrent message processors
visibility_timeout - Time (ms) before message becomes visible again
delay_seconds - Initial delay before processing
backoff_type - Retry backoff strategy (e.g., “exponential”)
backoff_delay_ms - Base delay between retries
Registration Example
use iii::modules::queue::QueueCoreModule;
// Register a custom queue adapter
QueueCoreModule::add_adapter("my_queue", |engine, config| async move {
let adapter = MyQueueAdapter::new(engine, config).await?;
Ok(Arc::new(adapter) as Arc<dyn QueueAdapter>)
}).await?;
See Also