Adapters provide pluggable backend implementations for iii modules. You can create custom adapters to integrate with proprietary systems, cloud services, or specialized data stores.
Adapter Architecture
Each module can have multiple adapter implementations:
QueueModule
├── RedisQueueAdapter (default)
├── RabbitMQQueueAdapter
└── YourCustomAdapter (new!)
Adapters are selected via configuration:
modules:
- class: modules::queue::QueueModule
config:
adapter:
class: my::CustomQueueAdapter
config:
connection_string: "..."
Creating a Custom Adapter
Step 1: Define the Adapter Trait
First, define the interface your adapter must implement.
use async_trait::async_trait;
use serde_json::Value;
#[async_trait]
pub trait CustomQueueAdapter: Send + Sync + 'static {
async fn enqueue(&self, topic: &str, event_data: Value);
async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
async fn unsubscribe(&self, topic: &str, id: &str);
}
Reference the built-in QueueAdapter trait at src/modules/queue/mod.rs:20 for a complete example.
Step 2: Create Adapter Registration
Define how adapters are registered with the inventory system.
use iii::modules::registry::AdapterFuture;
use std::sync::Arc;
type CustomQueueAdapterFuture = AdapterFuture<dyn CustomQueueAdapter>;
pub struct CustomQueueAdapterRegistration {
pub class: &'static str,
pub factory: fn(Arc<Engine>, Option<Value>) -> CustomQueueAdapterFuture,
}
impl AdapterRegistrationEntry<dyn CustomQueueAdapter>
for CustomQueueAdapterRegistration
{
fn class(&self) -> &'static str {
self.class
}
fn factory(&self) -> fn(Arc<Engine>, Option<Value>) -> CustomQueueAdapterFuture {
self.factory
}
}
inventory::collect!(CustomQueueAdapterRegistration);
Step 3: Implement the Adapter
Create your adapter implementation.
use std::collections::HashMap;
use tokio::sync::RwLock as TokioRwLock;
pub struct InMemoryQueueAdapter {
subscribers: Arc<TokioRwLock<HashMap<String, Vec<(String, String)>>>>,
engine: Arc<Engine>,
}
impl InMemoryQueueAdapter {
pub async fn new(config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
Ok(Self {
subscribers: Arc::new(TokioRwLock::new(HashMap::new())),
engine,
})
}
}
#[async_trait]
impl CustomQueueAdapter for InMemoryQueueAdapter {
async fn enqueue(&self, topic: &str, event_data: Value) {
let subscribers = self.subscribers.read().await;
if let Some(subs) = subscribers.get(topic) {
let mut invokes = vec![];
for (_id, function_id) in subs {
let invoke = self.engine.call(function_id, event_data.clone());
invokes.push(invoke);
}
futures::future::join_all(invokes).await;
}
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
self.subscribers
.write()
.await
.entry(topic.to_string())
.or_default()
.push((id.to_string(), function_id.to_string()));
}
async fn unsubscribe(&self, topic: &str, id: &str) {
if let Some(subs) = self.subscribers.write().await.get_mut(topic) {
subs.retain(|(sub_id, _)| sub_id != id);
}
}
}
Step 4: Register with the Macro
Create factory functions and register adapters.
fn make_inmemory_adapter(
engine: Arc<Engine>,
config: Option<Value>
) -> CustomQueueAdapterFuture {
Box::pin(async move {
Ok(Arc::new(InMemoryQueueAdapter::new(config, engine).await?)
as Arc<dyn CustomQueueAdapter>)
})
}
iii::register_adapter!(
<CustomQueueAdapterRegistration>
"my::InMemoryQueueAdapter",
make_inmemory_adapter
);
Step 5: Create Module Config
Define configuration structure for your module.
use serde::Deserialize;
use iii::modules::module::AdapterEntry;
#[derive(Debug, Clone, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct CustomQueueModuleConfig {
#[serde(default)]
pub adapter: Option<AdapterEntry>,
}
Step 6: Implement the Module
Create the module that uses your adapter.
use iii::modules::module::{ConfigurableModule, Module};
use iii::modules::config::EngineBuilder;
#[derive(Clone)]
pub struct CustomQueueModule {
adapter: Arc<dyn CustomQueueAdapter>,
engine: Arc<Engine>,
_config: CustomQueueModuleConfig,
}
#[async_trait]
impl Module for CustomQueueModule {
fn name(&self) -> &'static str {
"CustomQueueModule"
}
async fn create(engine: Arc<Engine>, config: Option<Value>) -> anyhow::Result<Box<dyn Module>> {
Self::create_with_adapters(engine, config).await
}
async fn initialize(&self) -> anyhow::Result<()> {
tracing::info!("Initializing CustomQueueModule");
// Register module functions here
self.engine.register_function(
RegisterFunctionRequest {
function_id: "custom_emit".to_string(),
description: Some("Emit to custom queue".to_string()),
request_format: Some(serde_json::json!({
"topic": { "type": "string" },
"data": { "type": "object" }
})),
response_format: None,
metadata: None,
},
Box::new(self.clone()),
);
Ok(())
}
}
#[async_trait]
impl ConfigurableModule for CustomQueueModule {
type Config = CustomQueueModuleConfig;
type Adapter = dyn CustomQueueAdapter;
type AdapterRegistration = CustomQueueAdapterRegistration;
const DEFAULT_ADAPTER_CLASS: &'static str = "my::InMemoryQueueAdapter";
async fn registry() -> &'static RwLock<HashMap<String, AdapterFactory<Self::Adapter>>> {
static REGISTRY: Lazy<RwLock<HashMap<String, AdapterFactory<dyn CustomQueueAdapter>>>> =
Lazy::new(|| RwLock::new(CustomQueueModule::build_registry()));
®ISTRY
}
fn build(engine: Arc<Engine>, config: Self::Config, adapter: Arc<Self::Adapter>) -> Self {
Self {
engine,
_config: config,
adapter,
}
}
fn adapter_class_from_config(config: &Self::Config) -> Option<String> {
config.adapter.as_ref().map(|a| a.class.clone())
}
fn adapter_config_from_config(config: &Self::Config) -> Option<Value> {
config.adapter.as_ref().and_then(|a| a.config.clone())
}
}
Step 7: Use the Module
Programmatically:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
iii::logging::init_log("config.yaml");
EngineBuilder::new()
.register_module::<CustomQueueModule>("my::CustomQueueModule")
.add_module(
"my::CustomQueueModule",
Some(serde_json::json!({
"adapter": {
"class": "my::InMemoryQueueAdapter",
"config": {}
}
})),
)
.address("127.0.0.1:49134")
.build()
.await?;
Ok(())
}
Via config.yaml:
modules:
- class: my::CustomQueueModule
config:
adapter:
class: my::InMemoryQueueAdapter
config: {}
Advanced Pattern: Decorator Adapters
You can create adapters that wrap other adapters (decorator pattern).
pub struct LoggingQueueAdapter {
inner: Arc<dyn CustomQueueAdapter>,
}
impl LoggingQueueAdapter {
pub async fn new(config: Option<Value>, engine: Arc<Engine>) -> anyhow::Result<Self> {
// Get the inner adapter class from config
let inner_adapter_class = config
.as_ref()
.and_then(|v| v.get("inner_adapter"))
.and_then(|v| v.as_str())
.unwrap_or("my::InMemoryQueueAdapter");
let inner_adapter = match inner_adapter_class {
"my::InMemoryQueueAdapter" => {
Arc::new(InMemoryQueueAdapter::new(None, engine).await?)
as Arc<dyn CustomQueueAdapter>
}
_ => anyhow::bail!("Unknown inner adapter: {}", inner_adapter_class),
};
Ok(Self { inner: inner_adapter })
}
}
#[async_trait]
impl CustomQueueAdapter for LoggingQueueAdapter {
async fn enqueue(&self, topic: &str, event_data: Value) {
tracing::info!(
topic = %topic,
event_data = %event_data,
"LoggingQueueAdapter: Enqueuing message"
);
self.inner.enqueue(topic, event_data).await;
}
async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
tracing::info!(
topic = %topic,
id = %id,
function_id = %function_id,
"LoggingQueueAdapter: Subscribing"
);
self.inner.subscribe(topic, id, function_id).await;
}
async fn unsubscribe(&self, topic: &str, id: &str) {
tracing::info!(
topic = %topic,
id = %id,
"LoggingQueueAdapter: Unsubscribing"
);
self.inner.unsubscribe(topic, id).await;
}
}
Configuration:
modules:
- class: my::CustomQueueModule
config:
adapter:
class: my::LoggingQueueAdapter
config:
inner_adapter: my::InMemoryQueueAdapter
Built-in Adapter Examples
QueueAdapter Trait
Reference: src/modules/queue/mod.rs:20
#[async_trait]
pub trait QueueAdapter: Send + Sync + 'static {
async fn enqueue(
&self,
topic: &str,
data: Value,
traceparent: Option<String>,
baggage: Option<String>,
);
async fn subscribe(
&self,
topic: &str,
id: &str,
function_id: &str,
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
);
async fn unsubscribe(&self, topic: &str, id: &str);
async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>;
async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>;
}
Features:
- Distributed tracing support (traceparent, baggage)
- Dead letter queue (DLQ) handling
- Conditional subscriptions
- Per-subscriber queue configuration
Best Practices
Error Handling
- Return
anyhow::Result from constructors
- Log errors with structured context
- Implement graceful degradation for transient failures
Concurrency
- Use
async_trait for async trait methods
- Prefer
tokio::sync::RwLock for shared state
- Use
Arc for shared adapter instances
Configuration
- Use
serde::Deserialize for config structs
- Provide sensible defaults
- Validate configuration in constructors
- Support environment variable expansion
Testing
- Create mock adapters for testing
- Test adapter registration
- Test configuration parsing
- Test concurrent access patterns
- Minimize lock contention
- Batch operations when possible
- Use connection pooling for external services
- Implement circuit breakers for reliability
Complete Example
See the full working example at examples/custom_queue_adapter.rs in the iii source code. This example demonstrates:
- Custom adapter trait definition
- Two adapter implementations (InMemory, Logging)
- Decorator pattern for composable adapters
- Module integration with EngineBuilder
- YAML configuration support
Run the example:
cargo run --example custom_queue_adapter
Common Use Cases
Cloud Queue Services
- AWS SQS/SNS adapter
- Google Cloud Pub/Sub adapter
- Azure Service Bus adapter
Message Brokers
- Kafka adapter
- NATS adapter
- Pulsar adapter
Custom Storage
- PostgreSQL-backed queue
- MongoDB change streams
- Custom database adapters
Hybrid Approaches
- Multi-cloud queue adapter (failover)
- Tiered storage (hot/cold data)
- Caching adapters (Redis → S3)