Skip to main content
The StreamAdapter trait defines the interface for real-time stream storage and synchronization in the iii framework. Adapters handle CRUD operations, event broadcasting, and WebSocket subscriptions.

Location

use iii::modules::stream::adapters::StreamAdapter;
Defined in src/modules/stream/adapters/mod.rs

Trait Definition

#[async_trait]
pub trait StreamAdapter: Send + Sync {
    async fn set(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
        data: Value,
    ) -> anyhow::Result<SetResult>;

    async fn get(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
    ) -> anyhow::Result<Option<Value>>;

    async fn delete(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
    ) -> anyhow::Result<DeleteResult>;

    async fn get_group(
        &self,
        stream_name: &str,
        group_id: &str
    ) -> anyhow::Result<Vec<Value>>;

    async fn list_groups(
        &self,
        stream_name: &str
    ) -> anyhow::Result<Vec<String>>;

    async fn list_all_stream(&self) -> anyhow::Result<Vec<StreamMetadata>>;

    async fn emit_event(
        &self,
        message: StreamWrapperMessage
    ) -> anyhow::Result<()>;

    async fn subscribe(
        &self,
        id: String,
        connection: Arc<dyn StreamConnection>,
    ) -> anyhow::Result<()>;

    async fn unsubscribe(&self, id: String) -> anyhow::Result<()>;

    async fn watch_events(&self) -> anyhow::Result<()>;

    async fn destroy(&self) -> anyhow::Result<()>;

    async fn update(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
        ops: Vec<UpdateOp>,
    ) -> anyhow::Result<UpdateResult>;
}

Required Methods

set

async fn set(
    &self,
    stream_name: &str,
    group_id: &str,
    item_id: &str,
    data: Value,
) -> anyhow::Result<SetResult>
Sets or creates an item in the stream and broadcasts a create/update event.
stream_name
&str
required
The stream identifier
group_id
&str
required
The group/collection within the stream
item_id
&str
required
The unique item identifier
data
Value
required
The item data as a JSON value
return
anyhow::Result<SetResult>
Result indicating whether item was created or updated

get

async fn get(
    &self,
    stream_name: &str,
    group_id: &str,
    item_id: &str,
) -> anyhow::Result<Option<Value>>
Retrieves a single item from the stream.
stream_name
&str
required
The stream identifier
group_id
&str
required
The group/collection within the stream
item_id
&str
required
The unique item identifier
return
anyhow::Result<Option<Value>>
The item data if found, None otherwise

delete

async fn delete(
    &self,
    stream_name: &str,
    group_id: &str,
    item_id: &str,
) -> anyhow::Result<DeleteResult>
Deletes an item from the stream and broadcasts a delete event.
stream_name
&str
required
The stream identifier
group_id
&str
required
The group/collection within the stream
item_id
&str
required
The unique item identifier
return
anyhow::Result<DeleteResult>
Result indicating success or failure

get_group

async fn get_group(
    &self,
    stream_name: &str,
    group_id: &str
) -> anyhow::Result<Vec<Value>>
Retrieves all items in a group.
stream_name
&str
required
The stream identifier
group_id
&str
required
The group/collection within the stream
return
anyhow::Result<Vec<Value>>
Vector of all items in the group

list_groups

async fn list_groups(
    &self,
    stream_name: &str
) -> anyhow::Result<Vec<String>>
Lists all group IDs within a stream.
stream_name
&str
required
The stream identifier
return
anyhow::Result<Vec<String>>
Vector of group identifiers

list_all_stream

async fn list_all_stream(&self) -> anyhow::Result<Vec<StreamMetadata>>
Lists all available streams with their metadata.
return
anyhow::Result<Vec<StreamMetadata>>
Vector of stream metadata objects

emit_event

async fn emit_event(
    &self,
    message: StreamWrapperMessage
) -> anyhow::Result<()>
Broadcasts an event to all subscribed connections.
message
StreamWrapperMessage
required
The event message to broadcast
return
anyhow::Result<()>
Returns Ok(()) on success or an error

subscribe

async fn subscribe(
    &self,
    id: String,
    connection: Arc<dyn StreamConnection>,
) -> anyhow::Result<()>
Registers a WebSocket connection for stream events.
id
String
required
Unique connection identifier
connection
Arc<dyn StreamConnection>
required
The WebSocket connection handler
return
anyhow::Result<()>
Returns Ok(()) on success or an error

unsubscribe

async fn unsubscribe(&self, id: String) -> anyhow::Result<()>
Removes a WebSocket connection subscription.
id
String
required
Connection identifier to remove
return
anyhow::Result<()>
Returns Ok(()) on success or an error

watch_events

async fn watch_events(&self) -> anyhow::Result<()>
Starts watching for stream events (typically from Redis pub/sub).
return
anyhow::Result<()>
Returns Ok(()) on success or an error

destroy

async fn destroy(&self) -> anyhow::Result<()>
Cleans up adapter resources during shutdown.
return
anyhow::Result<()>
Returns Ok(()) on success or an error

update

async fn update(
    &self,
    stream_name: &str,
    group_id: &str,
    item_id: &str,
    ops: Vec<UpdateOp>,
) -> anyhow::Result<UpdateResult>
Atomically updates an item using JSON operations.
stream_name
&str
required
The stream identifier
group_id
&str
required
The group/collection within the stream
item_id
&str
required
The unique item identifier
ops
Vec<UpdateOp>
required
Vector of atomic update operations to apply
return
anyhow::Result<UpdateResult>
The updated value or an error

Implementation Example

use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use redis::{Client, AsyncCommands, aio::ConnectionManager};
use tokio::sync::{Mutex, RwLock};
use serde_json::Value;
use iii::{
    modules::stream::{
        adapters::{StreamAdapter, StreamConnection},
        StreamMetadata, StreamWrapperMessage,
    },
};
use iii_sdk::{
    UpdateOp, UpdateResult,
    types::{DeleteResult, SetResult},
};

pub struct RedisAdapter {
    publisher: Arc<Mutex<ConnectionManager>>,
    subscriber: Arc<Client>,
    connections: Arc<RwLock<HashMap<String, Arc<dyn StreamConnection>>>>,
}

impl RedisAdapter {
    pub async fn new(redis_url: String) -> anyhow::Result<Self> {
        let client = Client::open(redis_url.as_str())?;
        let manager = client.get_connection_manager().await?;
        
        Ok(Self {
            publisher: Arc::new(Mutex::new(manager)),
            subscriber: Arc::new(client),
            connections: Arc::new(RwLock::new(HashMap::new())),
        })
    }
}

#[async_trait]
impl StreamAdapter for RedisAdapter {
    async fn set(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
        data: Value,
    ) -> anyhow::Result<SetResult> {
        let mut conn = self.publisher.lock().await;
        let key = format!("stream:{}:{}", stream_name, group_id);
        
        let exists: bool = conn.hexists(&key, item_id).await?;
        conn.hset(&key, item_id, data.to_string()).await?;
        
        Ok(if exists { SetResult::Updated } else { SetResult::Created })
    }

    async fn get(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
    ) -> anyhow::Result<Option<Value>> {
        let mut conn = self.publisher.lock().await;
        let key = format!("stream:{}:{}", stream_name, group_id);
        
        let result: Option<String> = conn.hget(&key, item_id).await?;
        Ok(result.and_then(|s| serde_json::from_str(&s).ok()))
    }

    async fn delete(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
    ) -> anyhow::Result<DeleteResult> {
        let mut conn = self.publisher.lock().await;
        let key = format!("stream:{}:{}", stream_name, group_id);
        
        let deleted: i32 = conn.hdel(&key, item_id).await?;
        Ok(if deleted > 0 { DeleteResult::Deleted } else { DeleteResult::NotFound })
    }

    async fn get_group(
        &self,
        stream_name: &str,
        group_id: &str,
    ) -> anyhow::Result<Vec<Value>> {
        // Implementation...
        Ok(vec![])
    }

    async fn list_groups(&self, stream_name: &str) -> anyhow::Result<Vec<String>> {
        // Implementation...
        Ok(vec![])
    }

    async fn list_all_stream(&self) -> anyhow::Result<Vec<StreamMetadata>> {
        // Implementation...
        Ok(vec![])
    }

    async fn emit_event(&self, message: StreamWrapperMessage) -> anyhow::Result<()> {
        // Implementation...
        Ok(())
    }

    async fn subscribe(
        &self,
        id: String,
        connection: Arc<dyn StreamConnection>,
    ) -> anyhow::Result<()> {
        let mut connections = self.connections.write().await;
        connections.insert(id, connection);
        Ok(())
    }

    async fn unsubscribe(&self, id: String) -> anyhow::Result<()> {
        let mut connections = self.connections.write().await;
        connections.remove(&id);
        Ok(())
    }

    async fn watch_events(&self) -> anyhow::Result<()> {
        // Implementation...
        Ok(())
    }

    async fn destroy(&self) -> anyhow::Result<()> {
        Ok(())
    }

    async fn update(
        &self,
        stream_name: &str,
        group_id: &str,
        item_id: &str,
        ops: Vec<UpdateOp>,
    ) -> anyhow::Result<UpdateResult> {
        // Implementation with atomic operations...
        Ok(UpdateResult { value: Value::Null })
    }
}

StreamMetadata

pub struct StreamMetadata {
    pub id: String,
    pub groups: Vec<String>,
}
Metadata describing a stream and its groups.

StreamWrapperMessage

pub struct StreamWrapperMessage {
    pub event_type: String,
    pub timestamp: i64,
    pub stream_name: String,
    pub group_id: String,
    pub id: Option<String>,
    pub event: StreamOutboundMessage,
}
Wrapper for stream events broadcast to WebSocket connections.

StreamConnection

#[async_trait]
pub trait StreamConnection: Subscriber + Send + Sync {
    async fn cleanup(&self);
    async fn handle_stream_message(&self, msg: &StreamWrapperMessage) -> anyhow::Result<()>;
}
Trait for WebSocket connection handlers that receive stream events.

UpdateOp

From the iii_sdk crate - represents atomic JSON operations:
  • Set a field
  • Delete a field
  • Append to array
  • Remove from array
  • Increment/decrement numbers

Registration Example

use iii::modules::stream::StreamCoreModule;

// Register a custom stream adapter
StreamCoreModule::add_adapter("my_stream", |engine, config| async move {
    let adapter = MyStreamAdapter::new(engine, config).await?;
    Ok(Arc::new(adapter) as Arc<dyn StreamAdapter>)
}).await?;

See Also

Build docs developers (and LLMs) love