Module Architecture
A module consists of:- Module Implementation - Implements the
Moduletrait - Adapter Trait - Defines the adapter interface
- Adapter Implementations - Concrete adapter implementations
- Configuration - Module and adapter configuration
- Functions - Functions exposed to the engine
- Triggers - Optional trigger types for events
Quick Start: Custom Queue Module
Let’s build a custom queue module with multiple adapters. This example is based onexamples/custom_queue_adapter.rs.
1. Define the Adapter Trait
Define what all adapters must implement:use async_trait::async_trait;
use serde_json::Value;
#[async_trait]
pub trait CustomQueueAdapter: Send + Sync + 'static {
async fn enqueue(&self, topic: &str, event_data: Value);
async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
async fn unsubscribe(&self, topic: &str, id: &str);
}
2. Create Adapter Implementations
Implement your adapters:use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
pub struct InMemoryQueueAdapter {
subscribers: Arc<RwLock<HashMap<String, Vec<(String, String)>>>>,
engine: Arc<Engine>,
}
impl InMemoryQueueAdapter {
pub async fn new(_config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
Ok(Self {
subscribers: Arc::new(RwLock::new(HashMap::new())),
engine,
})
}
}
#[async_trait]
impl CustomQueueAdapter for InMemoryQueueAdapter {
async fn enqueue(&self, topic: &str, event_data: Value) {
let subscribers = self.subscribers.read().await;
if let Some(subs) = subscribers.get(topic) {
for (_id, function_id) in subs {
let _ = self.engine.call(function_id, event_data.clone()).await;
}
}
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
self.subscribers
.write()
.await
.entry(topic.to_string())
.or_default()
.push((id.to_string(), function_id.to_string()));
}
async fn unsubscribe(&self, topic: &str, id: &str) {
if let Some(subs) = self.subscribers.write().await.get_mut(topic) {
subs.retain(|(sub_id, _)| sub_id != id);
}
}
}
3. Register Adapters
Create adapter registration and factories:use iii::modules::registry::AdapterFuture;
pub struct CustomQueueAdapterRegistration {
pub class: &'static str,
pub factory: fn(Arc<Engine>, Option<Value>) -> AdapterFuture<dyn CustomQueueAdapter>,
}
impl AdapterRegistrationEntry<dyn CustomQueueAdapter> for CustomQueueAdapterRegistration {
fn class(&self) -> &'static str {
self.class
}
fn factory(&self) -> fn(Arc<Engine>, Option<Value>) -> AdapterFuture<dyn CustomQueueAdapter> {
self.factory
}
}
inventory::collect!(CustomQueueAdapterRegistration);
// Factory function
fn make_inmemory_adapter(
engine: Arc<Engine>,
config: Option<Value>
) -> AdapterFuture<dyn CustomQueueAdapter> {
Box::pin(async move {
Ok(Arc::new(InMemoryQueueAdapter::new(config, engine).await?)
as Arc<dyn CustomQueueAdapter>)
})
}
// Register adapter
iii::register_adapter!(
<CustomQueueAdapterRegistration>
"my::InMemoryQueueAdapter",
make_inmemory_adapter
);
4. Define Module Configuration
use serde::Deserialize;
use iii::modules::module::AdapterEntry;
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomQueueModuleConfig {
#[serde(default)]
pub adapter: Option<AdapterEntry>,
}
5. Implement the Module
use iii::modules::module::{ConfigurableModule, Module};
#[derive(Clone)]
pub struct CustomQueueModule {
adapter: Arc<dyn CustomQueueAdapter>,
engine: Arc<Engine>,
_config: CustomQueueModuleConfig,
}
#[async_trait]
impl Module for CustomQueueModule {
fn name(&self) -> &'static str {
"CustomQueueModule"
}
async fn create(
engine: Arc<Engine>,
config: Option<Value>
) -> anyhow::Result<Box<dyn Module>> {
Self::create_with_adapters(engine, config).await
}
fn register_functions(&self, _engine: Arc<Engine>) {}
async fn initialize(&self) -> anyhow::Result<()> {
tracing::info!("Initializing CustomQueueModule");
// Register functions
self.engine.register_function(
RegisterFunctionRequest {
function_id: "custom_emit".to_string(),
description: Some("Emit to custom queue".to_string()),
request_format: None,
response_format: None,
metadata: None,
},
Box::new(self.clone()),
);
Ok(())
}
}
6. Implement ConfigurableModule
use std::collections::HashMap;
use std::sync::RwLock;
use once_cell::sync::Lazy;
#[async_trait]
impl ConfigurableModule for CustomQueueModule {
type Config = CustomQueueModuleConfig;
type Adapter = dyn CustomQueueAdapter;
type AdapterRegistration = CustomQueueAdapterRegistration;
const DEFAULT_ADAPTER_CLASS: &'static str = "my::InMemoryQueueAdapter";
async fn registry() -> &'static RwLock<HashMap<String, AdapterFactory<Self::Adapter>>> {
static REGISTRY: Lazy<RwLock<HashMap<String, AdapterFactory<dyn CustomQueueAdapter>>>> =
Lazy::new(|| RwLock::new(CustomQueueModule::build_registry()));
®ISTRY
}
fn build(
engine: Arc<Engine>,
config: Self::Config,
adapter: Arc<Self::Adapter>
) -> Self {
Self { engine, _config: config, adapter }
}
fn adapter_class_from_config(config: &Self::Config) -> Option<String> {
config.adapter.as_ref().map(|a| a.class.clone())
}
fn adapter_config_from_config(config: &Self::Config) -> Option<Value> {
config.adapter.as_ref().and_then(|a| a.config.clone())
}
}
7. Implement Function Handler
use iii::function::{FunctionHandler, FunctionResult};
use uuid::Uuid;
impl FunctionHandler for CustomQueueModule {
fn handle_function(
&self,
_invocation_id: Option<Uuid>,
_function_id: String,
input: Value,
) -> Pin<Box<dyn Future<Output = FunctionResult<Option<Value>, ErrorBody>> + Send + 'static>>
{
let adapter = self.adapter.clone();
Box::pin(async move {
let topic = input.get("topic").and_then(|v| v.as_str()).unwrap_or("");
let data = input.get("data").cloned().unwrap_or(Value::Null);
if topic.is_empty() {
return FunctionResult::Failure(ErrorBody {
code: "topic_not_set".into(),
message: "Topic is not set".into(),
});
}
adapter.enqueue(topic, data).await;
FunctionResult::Success(None)
})
}
}
8. Register and Use the Module
use iii::modules::config::EngineBuilder;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
iii::logging::init_log("config.yaml");
EngineBuilder::new()
.register_module::<CustomQueueModule>("my::CustomQueueModule")
.add_module(
"my::CustomQueueModule",
Some(serde_json::json!({
"adapter": {
"class": "my::InMemoryQueueAdapter"
}
})),
)
.address("127.0.0.1:49134")
.build()
.await?;
Ok(())
}
Configuration in config.yaml
Add your module to the configuration:config.yaml
modules:
- class: my::CustomQueueModule
config:
adapter:
class: my::InMemoryQueueAdapter
# Or use different adapter:
# class: my::LoggingQueueAdapter
# config:
# inner_adapter: my::InMemoryQueueAdapter
Adding Multiple Adapters
Create different adapter implementations:pub struct LoggingQueueAdapter {
inner: Arc<dyn CustomQueueAdapter>,
}
impl LoggingQueueAdapter {
pub async fn new(config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
let inner_class = config
.as_ref()
.and_then(|v| v.get("inner_adapter"))
.and_then(|v| v.as_str())
.unwrap_or("my::InMemoryQueueAdapter");
let inner_adapter = match inner_class {
"my::InMemoryQueueAdapter" => {
Arc::new(InMemoryQueueAdapter::new(None, engine).await?)
as Arc<dyn CustomQueueAdapter>
}
_ => return Err(anyhow::anyhow!("Unknown inner adapter: {}", inner_class)),
};
Ok(Self { inner: inner_adapter })
}
}
#[async_trait]
impl CustomQueueAdapter for LoggingQueueAdapter {
async fn enqueue(&self, topic: &str, event_data: Value) {
tracing::info!("Enqueuing to {}: {:?}", topic, event_data);
self.inner.enqueue(topic, event_data).await;
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
tracing::info!("Subscribing {} to {}", function_id, topic);
self.inner.subscribe(topic, id, function_id).await;
}
async fn unsubscribe(&self, topic: &str, id: &str) {
tracing::info!("Unsubscribing {} from {}", id, topic);
self.inner.unsubscribe(topic, id).await;
}
}
// Register the adapter
fn make_logging_adapter(
engine: Arc<Engine>,
config: Option<Value>
) -> AdapterFuture<dyn CustomQueueAdapter> {
Box::pin(async move {
Ok(Arc::new(LoggingQueueAdapter::new(config, engine).await?)
as Arc<dyn CustomQueueAdapter>)
})
}
iii::register_adapter!(
<CustomQueueAdapterRegistration>
"my::LoggingQueueAdapter",
make_logging_adapter
);
Adding Triggers (Optional)
Implement trigger support:use iii::trigger::{TriggerRegistrator, TriggerType, Trigger};
impl TriggerRegistrator for CustomQueueModule {
fn register_trigger(
&self,
trigger: Trigger,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
let topic = trigger.config
.get("topic")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let adapter = self.adapter.clone();
Box::pin(async move {
if !topic.is_empty() {
adapter.subscribe(&topic, &trigger.id, &trigger.function_id).await;
}
Ok(())
})
}
fn unregister_trigger(
&self,
trigger: Trigger,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
let adapter = self.adapter.clone();
Box::pin(async move {
let topic = trigger.config
.get("topic")
.and_then(|v| v.as_str())
.unwrap_or("");
adapter.unsubscribe(topic, &trigger.id).await;
Ok(())
})
}
}
// Register trigger type in initialize()
impl Module for CustomQueueModule {
async fn initialize(&self) -> anyhow::Result<()> {
// ... existing code ...
self.engine.register_trigger_type(TriggerType {
id: "custom_queue".to_string(),
_description: "Custom queue trigger".to_string(),
registrator: Box::new(self.clone()),
worker_id: None,
}).await;
Ok(())
}
}
Best Practices
- Trait-based Design: Define clear adapter traits
- Error Handling: Use
anyhow::Resultfor errors - Async: All I/O operations should be async
- Configuration: Support flexible adapter configuration
- Testing: Write tests for each adapter
- Documentation: Document your module’s functions and configuration
- Logging: Use
tracingfor observability
Example: Database Module
#[async_trait]
pub trait DatabaseAdapter: Send + Sync + 'static {
async fn query(&self, sql: &str) -> anyhow::Result<Value>;
async fn execute(&self, sql: &str) -> anyhow::Result<u64>;
}
pub struct PostgresAdapter { /* ... */ }
pub struct SQLiteAdapter { /* ... */ }
impl DatabaseAdapter for PostgresAdapter { /* ... */ }
impl DatabaseAdapter for SQLiteAdapter { /* ... */ }
Source Code Reference
- Full example:
examples/custom_queue_adapter.rs - Module trait:
src/modules/module.rs:30 - ConfigurableModule:
src/modules/module.rs:76 - Adapter registration:
src/modules/registry.rs