Skip to main content
Create custom modules to extend iii with new functionality. Modules can have multiple adapter implementations for different backends.

Module Architecture

A module consists of:
  1. Module Implementation - Implements the Module trait
  2. Adapter Trait - Defines the adapter interface
  3. Adapter Implementations - Concrete adapter implementations
  4. Configuration - Module and adapter configuration
  5. Functions - Functions exposed to the engine
  6. Triggers - Optional trigger types for events

Quick Start: Custom Queue Module

Let’s build a custom queue module with multiple adapters. This example is based on examples/custom_queue_adapter.rs.

1. Define the Adapter Trait

Define what all adapters must implement:
use async_trait::async_trait;
use serde_json::Value;

#[async_trait]
pub trait CustomQueueAdapter: Send + Sync + 'static {
    async fn enqueue(&self, topic: &str, event_data: Value);
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
    async fn unsubscribe(&self, topic: &str, id: &str);
}

2. Create Adapter Implementations

Implement your adapters:
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;

pub struct InMemoryQueueAdapter {
    subscribers: Arc<RwLock<HashMap<String, Vec<(String, String)>>>>,
    engine: Arc<Engine>,
}

impl InMemoryQueueAdapter {
    pub async fn new(_config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
        Ok(Self {
            subscribers: Arc::new(RwLock::new(HashMap::new())),
            engine,
        })
    }
}

#[async_trait]
impl CustomQueueAdapter for InMemoryQueueAdapter {
    async fn enqueue(&self, topic: &str, event_data: Value) {
        let subscribers = self.subscribers.read().await;
        if let Some(subs) = subscribers.get(topic) {
            for (_id, function_id) in subs {
                let _ = self.engine.call(function_id, event_data.clone()).await;
            }
        }
    }
    
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
        self.subscribers
            .write()
            .await
            .entry(topic.to_string())
            .or_default()
            .push((id.to_string(), function_id.to_string()));
    }
    
    async fn unsubscribe(&self, topic: &str, id: &str) {
        if let Some(subs) = self.subscribers.write().await.get_mut(topic) {
            subs.retain(|(sub_id, _)| sub_id != id);
        }
    }
}

3. Register Adapters

Create adapter registration and factories:
use iii::modules::registry::AdapterFuture;

pub struct CustomQueueAdapterRegistration {
    pub class: &'static str,
    pub factory: fn(Arc<Engine>, Option<Value>) -> AdapterFuture<dyn CustomQueueAdapter>,
}

impl AdapterRegistrationEntry<dyn CustomQueueAdapter> for CustomQueueAdapterRegistration {
    fn class(&self) -> &'static str {
        self.class
    }
    
    fn factory(&self) -> fn(Arc<Engine>, Option<Value>) -> AdapterFuture<dyn CustomQueueAdapter> {
        self.factory
    }
}

inventory::collect!(CustomQueueAdapterRegistration);

// Factory function
fn make_inmemory_adapter(
    engine: Arc<Engine>,
    config: Option<Value>
) -> AdapterFuture<dyn CustomQueueAdapter> {
    Box::pin(async move {
        Ok(Arc::new(InMemoryQueueAdapter::new(config, engine).await?)
            as Arc<dyn CustomQueueAdapter>)
    })
}

// Register adapter
iii::register_adapter!(
    <CustomQueueAdapterRegistration>
    "my::InMemoryQueueAdapter",
    make_inmemory_adapter
);

4. Define Module Configuration

use serde::Deserialize;
use iii::modules::module::AdapterEntry;

#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomQueueModuleConfig {
    #[serde(default)]
    pub adapter: Option<AdapterEntry>,
}

5. Implement the Module

use iii::modules::module::{ConfigurableModule, Module};

#[derive(Clone)]
pub struct CustomQueueModule {
    adapter: Arc<dyn CustomQueueAdapter>,
    engine: Arc<Engine>,
    _config: CustomQueueModuleConfig,
}

#[async_trait]
impl Module for CustomQueueModule {
    fn name(&self) -> &'static str {
        "CustomQueueModule"
    }
    
    async fn create(
        engine: Arc<Engine>,
        config: Option<Value>
    ) -> anyhow::Result<Box<dyn Module>> {
        Self::create_with_adapters(engine, config).await
    }
    
    fn register_functions(&self, _engine: Arc<Engine>) {}
    
    async fn initialize(&self) -> anyhow::Result<()> {
        tracing::info!("Initializing CustomQueueModule");
        
        // Register functions
        self.engine.register_function(
            RegisterFunctionRequest {
                function_id: "custom_emit".to_string(),
                description: Some("Emit to custom queue".to_string()),
                request_format: None,
                response_format: None,
                metadata: None,
            },
            Box::new(self.clone()),
        );
        
        Ok(())
    }
}

6. Implement ConfigurableModule

use std::collections::HashMap;
use std::sync::RwLock;
use once_cell::sync::Lazy;

#[async_trait]
impl ConfigurableModule for CustomQueueModule {
    type Config = CustomQueueModuleConfig;
    type Adapter = dyn CustomQueueAdapter;
    type AdapterRegistration = CustomQueueAdapterRegistration;
    const DEFAULT_ADAPTER_CLASS: &'static str = "my::InMemoryQueueAdapter";
    
    async fn registry() -> &'static RwLock<HashMap<String, AdapterFactory<Self::Adapter>>> {
        static REGISTRY: Lazy<RwLock<HashMap<String, AdapterFactory<dyn CustomQueueAdapter>>>> =
            Lazy::new(|| RwLock::new(CustomQueueModule::build_registry()));
        &REGISTRY
    }
    
    fn build(
        engine: Arc<Engine>,
        config: Self::Config,
        adapter: Arc<Self::Adapter>
    ) -> Self {
        Self { engine, _config: config, adapter }
    }
    
    fn adapter_class_from_config(config: &Self::Config) -> Option<String> {
        config.adapter.as_ref().map(|a| a.class.clone())
    }
    
    fn adapter_config_from_config(config: &Self::Config) -> Option<Value> {
        config.adapter.as_ref().and_then(|a| a.config.clone())
    }
}

7. Implement Function Handler

use iii::function::{FunctionHandler, FunctionResult};
use uuid::Uuid;

impl FunctionHandler for CustomQueueModule {
    fn handle_function(
        &self,
        _invocation_id: Option<Uuid>,
        _function_id: String,
        input: Value,
    ) -> Pin<Box<dyn Future<Output = FunctionResult<Option<Value>, ErrorBody>> + Send + 'static>>
    {
        let adapter = self.adapter.clone();
        Box::pin(async move {
            let topic = input.get("topic").and_then(|v| v.as_str()).unwrap_or("");
            let data = input.get("data").cloned().unwrap_or(Value::Null);
            
            if topic.is_empty() {
                return FunctionResult::Failure(ErrorBody {
                    code: "topic_not_set".into(),
                    message: "Topic is not set".into(),
                });
            }
            
            adapter.enqueue(topic, data).await;
            FunctionResult::Success(None)
        })
    }
}

8. Register and Use the Module

use iii::modules::config::EngineBuilder;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    iii::logging::init_log("config.yaml");
    
    EngineBuilder::new()
        .register_module::<CustomQueueModule>("my::CustomQueueModule")
        .add_module(
            "my::CustomQueueModule",
            Some(serde_json::json!({
                "adapter": {
                    "class": "my::InMemoryQueueAdapter"
                }
            })),
        )
        .address("127.0.0.1:49134")
        .build()
        .await?;
    
    Ok(())
}

Configuration in config.yaml

Add your module to the configuration:
config.yaml
modules:
  - class: my::CustomQueueModule
    config:
      adapter:
        class: my::InMemoryQueueAdapter
        # Or use different adapter:
        # class: my::LoggingQueueAdapter
        # config:
        #   inner_adapter: my::InMemoryQueueAdapter

Adding Multiple Adapters

Create different adapter implementations:
pub struct LoggingQueueAdapter {
    inner: Arc<dyn CustomQueueAdapter>,
}

impl LoggingQueueAdapter {
    pub async fn new(config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
        let inner_class = config
            .as_ref()
            .and_then(|v| v.get("inner_adapter"))
            .and_then(|v| v.as_str())
            .unwrap_or("my::InMemoryQueueAdapter");
        
        let inner_adapter = match inner_class {
            "my::InMemoryQueueAdapter" => {
                Arc::new(InMemoryQueueAdapter::new(None, engine).await?)
                    as Arc<dyn CustomQueueAdapter>
            }
            _ => return Err(anyhow::anyhow!("Unknown inner adapter: {}", inner_class)),
        };
        
        Ok(Self { inner: inner_adapter })
    }
}

#[async_trait]
impl CustomQueueAdapter for LoggingQueueAdapter {
    async fn enqueue(&self, topic: &str, event_data: Value) {
        tracing::info!("Enqueuing to {}: {:?}", topic, event_data);
        self.inner.enqueue(topic, event_data).await;
    }
    
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
        tracing::info!("Subscribing {} to {}", function_id, topic);
        self.inner.subscribe(topic, id, function_id).await;
    }
    
    async fn unsubscribe(&self, topic: &str, id: &str) {
        tracing::info!("Unsubscribing {} from {}", id, topic);
        self.inner.unsubscribe(topic, id).await;
    }
}

// Register the adapter
fn make_logging_adapter(
    engine: Arc<Engine>,
    config: Option<Value>
) -> AdapterFuture<dyn CustomQueueAdapter> {
    Box::pin(async move {
        Ok(Arc::new(LoggingQueueAdapter::new(config, engine).await?)
            as Arc<dyn CustomQueueAdapter>)
    })
}

iii::register_adapter!(
    <CustomQueueAdapterRegistration>
    "my::LoggingQueueAdapter",
    make_logging_adapter
);

Adding Triggers (Optional)

Implement trigger support:
use iii::trigger::{TriggerRegistrator, TriggerType, Trigger};

impl TriggerRegistrator for CustomQueueModule {
    fn register_trigger(
        &self,
        trigger: Trigger,
    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
        let topic = trigger.config
            .get("topic")
            .and_then(|v| v.as_str())
            .unwrap_or("")
            .to_string();
        
        let adapter = self.adapter.clone();
        
        Box::pin(async move {
            if !topic.is_empty() {
                adapter.subscribe(&topic, &trigger.id, &trigger.function_id).await;
            }
            Ok(())
        })
    }
    
    fn unregister_trigger(
        &self,
        trigger: Trigger,
    ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
        let adapter = self.adapter.clone();
        
        Box::pin(async move {
            let topic = trigger.config
                .get("topic")
                .and_then(|v| v.as_str())
                .unwrap_or("");
            adapter.unsubscribe(topic, &trigger.id).await;
            Ok(())
        })
    }
}

// Register trigger type in initialize()
impl Module for CustomQueueModule {
    async fn initialize(&self) -> anyhow::Result<()> {
        // ... existing code ...
        
        self.engine.register_trigger_type(TriggerType {
            id: "custom_queue".to_string(),
            _description: "Custom queue trigger".to_string(),
            registrator: Box::new(self.clone()),
            worker_id: None,
        }).await;
        
        Ok(())
    }
}

Best Practices

  1. Trait-based Design: Define clear adapter traits
  2. Error Handling: Use anyhow::Result for errors
  3. Async: All I/O operations should be async
  4. Configuration: Support flexible adapter configuration
  5. Testing: Write tests for each adapter
  6. Documentation: Document your module’s functions and configuration
  7. Logging: Use tracing for observability

Example: Database Module

#[async_trait]
pub trait DatabaseAdapter: Send + Sync + 'static {
    async fn query(&self, sql: &str) -> anyhow::Result<Value>;
    async fn execute(&self, sql: &str) -> anyhow::Result<u64>;
}

pub struct PostgresAdapter { /* ... */ }
pub struct SQLiteAdapter { /* ... */ }

impl DatabaseAdapter for PostgresAdapter { /* ... */ }
impl DatabaseAdapter for SQLiteAdapter { /* ... */ }

Source Code Reference

  • Full example: examples/custom_queue_adapter.rs
  • Module trait: src/modules/module.rs:30
  • ConfigurableModule: src/modules/module.rs:76
  • Adapter registration: src/modules/registry.rs

Build docs developers (and LLMs) love