Skip to main content
Adapters provide pluggable backend implementations for iii modules. You can create custom adapters to integrate with proprietary systems, cloud services, or specialized data stores.

Adapter Architecture

Each module can have multiple adapter implementations:
QueueModule
├── RedisQueueAdapter (default)
├── RabbitMQQueueAdapter
└── YourCustomAdapter (new!)
Adapters are selected via configuration:
modules:
  - class: modules::queue::QueueModule
    config:
      adapter:
        class: my::CustomQueueAdapter
        config:
          connection_string: "..."

Creating a Custom Adapter

Step 1: Define the Adapter Trait

First, define the interface your adapter must implement.
use async_trait::async_trait;
use serde_json::Value;

#[async_trait]
pub trait CustomQueueAdapter: Send + Sync + 'static {
    async fn enqueue(&self, topic: &str, event_data: Value);
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
    async fn unsubscribe(&self, topic: &str, id: &str);
}
Reference the built-in QueueAdapter trait at src/modules/queue/mod.rs:20 for a complete example.

Step 2: Create Adapter Registration

Define how adapters are registered with the inventory system.
use iii::modules::registry::AdapterFuture;
use std::sync::Arc;

type CustomQueueAdapterFuture = AdapterFuture<dyn CustomQueueAdapter>;

pub struct CustomQueueAdapterRegistration {
    pub class: &'static str,
    pub factory: fn(Arc<Engine>, Option<Value>) -> CustomQueueAdapterFuture,
}

impl AdapterRegistrationEntry<dyn CustomQueueAdapter> 
    for CustomQueueAdapterRegistration 
{
    fn class(&self) -> &'static str {
        self.class
    }

    fn factory(&self) -> fn(Arc<Engine>, Option<Value>) -> CustomQueueAdapterFuture {
        self.factory
    }
}

inventory::collect!(CustomQueueAdapterRegistration);

Step 3: Implement the Adapter

Create your adapter implementation.
use std::collections::HashMap;
use tokio::sync::RwLock as TokioRwLock;

pub struct InMemoryQueueAdapter {
    subscribers: Arc<TokioRwLock<HashMap<String, Vec<(String, String)>>>>,
    engine: Arc<Engine>,
}

impl InMemoryQueueAdapter {
    pub async fn new(config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
        Ok(Self {
            subscribers: Arc::new(TokioRwLock::new(HashMap::new())),
            engine,
        })
    }
}

#[async_trait]
impl CustomQueueAdapter for InMemoryQueueAdapter {
    async fn enqueue(&self, topic: &str, event_data: Value) {
        let subscribers = self.subscribers.read().await;
        if let Some(subs) = subscribers.get(topic) {
            let mut invokes = vec![];
            for (_id, function_id) in subs {
                let invoke = self.engine.call(function_id, event_data.clone());
                invokes.push(invoke);
            }
            futures::future::join_all(invokes).await;
        }
    }

    async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
        self.subscribers
            .write()
            .await
            .entry(topic.to_string())
            .or_default()
            .push((id.to_string(), function_id.to_string()));
    }

    async fn unsubscribe(&self, topic: &str, id: &str) {
        if let Some(subs) = self.subscribers.write().await.get_mut(topic) {
            subs.retain(|(sub_id, _)| sub_id != id);
        }
    }
}

Step 4: Register with the Macro

Create factory functions and register adapters.
fn make_inmemory_adapter(
    engine: Arc<Engine>, 
    config: Option<Value>
) -> CustomQueueAdapterFuture {
    Box::pin(async move {
        Ok(Arc::new(InMemoryQueueAdapter::new(config, engine).await?)
            as Arc<dyn CustomQueueAdapter>)
    })
}

iii::register_adapter!(
    <CustomQueueAdapterRegistration> 
    "my::InMemoryQueueAdapter", 
    make_inmemory_adapter
);

Step 5: Create Module Config

Define configuration structure for your module.
use serde::Deserialize;
use iii::modules::module::AdapterEntry;

#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomQueueModuleConfig {
    #[serde(default)]
    pub adapter: Option<AdapterEntry>,
}

Step 6: Implement the Module

Create the module that uses your adapter.
use iii::modules::module::{ConfigurableModule, Module};
use iii::modules::config::EngineBuilder;

#[derive(Clone)]
pub struct CustomQueueModule {
    adapter: Arc<dyn CustomQueueAdapter>,
    engine: Arc<Engine>,
    _config: CustomQueueModuleConfig,
}

#[async_trait]
impl Module for CustomQueueModule {
    fn name(&self) -> &'static str {
        "CustomQueueModule"
    }

    async fn create(engine: Arc<Engine>, config: Option<Value>) -> anyhow::Result<Box<dyn Module>> {
        Self::create_with_adapters(engine, config).await
    }

    async fn initialize(&self) -> anyhow::Result<()> {
        tracing::info!("Initializing CustomQueueModule");
        
        // Register module functions here
        self.engine.register_function(
            RegisterFunctionRequest {
                function_id: "custom_emit".to_string(),
                description: Some("Emit to custom queue".to_string()),
                request_format: Some(serde_json::json!({
                    "topic": { "type": "string" },
                    "data": { "type": "object" }
                })),
                response_format: None,
                metadata: None,
            },
            Box::new(self.clone()),
        );
        
        Ok(())
    }
}

#[async_trait]
impl ConfigurableModule for CustomQueueModule {
    type Config = CustomQueueModuleConfig;
    type Adapter = dyn CustomQueueAdapter;
    type AdapterRegistration = CustomQueueAdapterRegistration;
    const DEFAULT_ADAPTER_CLASS: &'static str = "my::InMemoryQueueAdapter";

    async fn registry() -> &'static RwLock<HashMap<String, AdapterFactory<Self::Adapter>>> {
        static REGISTRY: Lazy<RwLock<HashMap<String, AdapterFactory<dyn CustomQueueAdapter>>>> =
            Lazy::new(|| RwLock::new(CustomQueueModule::build_registry()));
        &REGISTRY
    }

    fn build(engine: Arc<Engine>, config: Self::Config, adapter: Arc<Self::Adapter>) -> Self {
        Self {
            engine,
            _config: config,
            adapter,
        }
    }

    fn adapter_class_from_config(config: &Self::Config) -> Option<String> {
        config.adapter.as_ref().map(|a| a.class.clone())
    }

    fn adapter_config_from_config(config: &Self::Config) -> Option<Value> {
        config.adapter.as_ref().and_then(|a| a.config.clone())
    }
}

Step 7: Use the Module

Programmatically:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    iii::logging::init_log("config.yaml");

    EngineBuilder::new()
        .register_module::<CustomQueueModule>("my::CustomQueueModule")
        .add_module(
            "my::CustomQueueModule",
            Some(serde_json::json!({
                "adapter": {
                    "class": "my::InMemoryQueueAdapter",
                    "config": {}
                }
            })),
        )
        .address("127.0.0.1:49134")
        .build()
        .await?;

    Ok(())
}
Via config.yaml:
modules:
  - class: my::CustomQueueModule
    config:
      adapter:
        class: my::InMemoryQueueAdapter
        config: {}

Advanced Pattern: Decorator Adapters

You can create adapters that wrap other adapters (decorator pattern).
pub struct LoggingQueueAdapter {
    inner: Arc<dyn CustomQueueAdapter>,
}

impl LoggingQueueAdapter {
    pub async fn new(config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
        // Get the inner adapter class from config
        let inner_adapter_class = config
            .as_ref()
            .and_then(|v| v.get("inner_adapter"))
            .and_then(|v| v.as_str())
            .unwrap_or("my::InMemoryQueueAdapter");

        let inner_adapter = match inner_adapter_class {
            "my::InMemoryQueueAdapter" => {
                Arc::new(InMemoryQueueAdapter::new(None, engine).await?)
                    as Arc<dyn CustomQueueAdapter>
            }
            _ => anyhow::bail!("Unknown inner adapter: {}", inner_adapter_class),
        };

        Ok(Self { inner: inner_adapter })
    }
}

#[async_trait]
impl CustomQueueAdapter for LoggingQueueAdapter {
    async fn enqueue(&self, topic: &str, event_data: Value) {
        tracing::info!(
            topic = %topic,
            event_data = %event_data,
            "LoggingQueueAdapter: Enqueuing message"
        );
        self.inner.enqueue(topic, event_data).await;
    }

    async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
        tracing::info!(
            topic = %topic,
            id = %id,
            function_id = %function_id,
            "LoggingQueueAdapter: Subscribing"
        );
        self.inner.subscribe(topic, id, function_id).await;
    }

    async fn unsubscribe(&self, topic: &str, id: &str) {
        tracing::info!(
            topic = %topic,
            id = %id,
            "LoggingQueueAdapter: Unsubscribing"
        );
        self.inner.unsubscribe(topic, id).await;
    }
}
Configuration:
modules:
  - class: my::CustomQueueModule
    config:
      adapter:
        class: my::LoggingQueueAdapter
        config:
          inner_adapter: my::InMemoryQueueAdapter

Built-in Adapter Examples

QueueAdapter Trait

Reference: src/modules/queue/mod.rs:20
#[async_trait]
pub trait QueueAdapter: Send + Sync + 'static {
    async fn enqueue(
        &self,
        topic: &str,
        data: Value,
        traceparent: Option<String>,
        baggage: Option<String>,
    );
    
    async fn subscribe(
        &self,
        topic: &str,
        id: &str,
        function_id: &str,
        condition_function_id: Option<String>,
        queue_config: Option<SubscriberQueueConfig>,
    );
    
    async fn unsubscribe(&self, topic: &str, id: &str);
    async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>;
    async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>;
}
Features:
  • Distributed tracing support (traceparent, baggage)
  • Dead letter queue (DLQ) handling
  • Conditional subscriptions
  • Per-subscriber queue configuration

Best Practices

Error Handling

  • Return anyhow::Result from constructors
  • Log errors with structured context
  • Implement graceful degradation for transient failures

Concurrency

  • Use async_trait for async trait methods
  • Prefer tokio::sync::RwLock for shared state
  • Use Arc for shared adapter instances

Configuration

  • Use serde::Deserialize for config structs
  • Provide sensible defaults
  • Validate configuration in constructors
  • Support environment variable expansion

Testing

  • Create mock adapters for testing
  • Test adapter registration
  • Test configuration parsing
  • Test concurrent access patterns

Performance

  • Minimize lock contention
  • Batch operations when possible
  • Use connection pooling for external services
  • Implement circuit breakers for reliability

Complete Example

See the full working example at examples/custom_queue_adapter.rs in the iii source code. This example demonstrates:
  • Custom adapter trait definition
  • Two adapter implementations (InMemory, Logging)
  • Decorator pattern for composable adapters
  • Module integration with EngineBuilder
  • YAML configuration support
Run the example:
cargo run --example custom_queue_adapter

Common Use Cases

Cloud Queue Services

  • AWS SQS/SNS adapter
  • Google Cloud Pub/Sub adapter
  • Azure Service Bus adapter

Message Brokers

  • Kafka adapter
  • NATS adapter
  • Pulsar adapter

Custom Storage

  • PostgreSQL-backed queue
  • MongoDB change streams
  • Custom database adapters

Hybrid Approaches

  • Multi-cloud queue adapter (failover)
  • Tiered storage (hot/cold data)
  • Caching adapters (Redis → S3)

Build docs developers (and LLMs) love