The StreamAdapter trait defines the interface for real-time stream storage and synchronization in the iii framework. Adapters handle CRUD operations, event broadcasting, and WebSocket subscriptions.
Location
use iii::modules::stream::adapters::StreamAdapter;
Defined in src/modules/stream/adapters/mod.rs
Trait Definition
#[async_trait]
pub trait StreamAdapter: Send + Sync {
async fn set(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
data: Value,
) -> anyhow::Result<SetResult>;
async fn get(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
) -> anyhow::Result<Option<Value>>;
async fn delete(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
) -> anyhow::Result<DeleteResult>;
async fn get_group(
&self,
stream_name: &str,
group_id: &str
) -> anyhow::Result<Vec<Value>>;
async fn list_groups(
&self,
stream_name: &str
) -> anyhow::Result<Vec<String>>;
async fn list_all_stream(&self) -> anyhow::Result<Vec<StreamMetadata>>;
async fn emit_event(
&self,
message: StreamWrapperMessage
) -> anyhow::Result<()>;
async fn subscribe(
&self,
id: String,
connection: Arc<dyn StreamConnection>,
) -> anyhow::Result<()>;
async fn unsubscribe(&self, id: String) -> anyhow::Result<()>;
async fn watch_events(&self) -> anyhow::Result<()>;
async fn destroy(&self) -> anyhow::Result<()>;
async fn update(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
ops: Vec<UpdateOp>,
) -> anyhow::Result<UpdateResult>;
}
Required Methods
set
async fn set(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
data: Value,
) -> anyhow::Result<SetResult>
Sets or creates an item in the stream and broadcasts a create/update event.
The group/collection within the stream
The unique item identifier
The item data as a JSON value
return
anyhow::Result<SetResult>
Result indicating whether item was created or updated
get
async fn get(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
) -> anyhow::Result<Option<Value>>
Retrieves a single item from the stream.
The group/collection within the stream
The unique item identifier
return
anyhow::Result<Option<Value>>
The item data if found, None otherwise
delete
async fn delete(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
) -> anyhow::Result<DeleteResult>
Deletes an item from the stream and broadcasts a delete event.
The group/collection within the stream
The unique item identifier
return
anyhow::Result<DeleteResult>
Result indicating success or failure
get_group
async fn get_group(
&self,
stream_name: &str,
group_id: &str
) -> anyhow::Result<Vec<Value>>
Retrieves all items in a group.
The group/collection within the stream
return
anyhow::Result<Vec<Value>>
Vector of all items in the group
list_groups
async fn list_groups(
&self,
stream_name: &str
) -> anyhow::Result<Vec<String>>
Lists all group IDs within a stream.
return
anyhow::Result<Vec<String>>
Vector of group identifiers
list_all_stream
async fn list_all_stream(&self) -> anyhow::Result<Vec<StreamMetadata>>
Lists all available streams with their metadata.
return
anyhow::Result<Vec<StreamMetadata>>
Vector of stream metadata objects
emit_event
async fn emit_event(
&self,
message: StreamWrapperMessage
) -> anyhow::Result<()>
Broadcasts an event to all subscribed connections.
message
StreamWrapperMessage
required
The event message to broadcast
Returns Ok(()) on success or an error
subscribe
async fn subscribe(
&self,
id: String,
connection: Arc<dyn StreamConnection>,
) -> anyhow::Result<()>
Registers a WebSocket connection for stream events.
Unique connection identifier
connection
Arc<dyn StreamConnection>
required
The WebSocket connection handler
Returns Ok(()) on success or an error
unsubscribe
async fn unsubscribe(&self, id: String) -> anyhow::Result<()>
Removes a WebSocket connection subscription.
Connection identifier to remove
Returns Ok(()) on success or an error
watch_events
async fn watch_events(&self) -> anyhow::Result<()>
Starts watching for stream events (typically from Redis pub/sub).
Returns Ok(()) on success or an error
destroy
async fn destroy(&self) -> anyhow::Result<()>
Cleans up adapter resources during shutdown.
Returns Ok(()) on success or an error
update
async fn update(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
ops: Vec<UpdateOp>,
) -> anyhow::Result<UpdateResult>
Atomically updates an item using JSON operations.
The group/collection within the stream
The unique item identifier
Vector of atomic update operations to apply
return
anyhow::Result<UpdateResult>
The updated value or an error
Implementation Example
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use redis::{Client, AsyncCommands, aio::ConnectionManager};
use tokio::sync::{Mutex, RwLock};
use serde_json::Value;
use iii::{
modules::stream::{
adapters::{StreamAdapter, StreamConnection},
StreamMetadata, StreamWrapperMessage,
},
};
use iii_sdk::{
UpdateOp, UpdateResult,
types::{DeleteResult, SetResult},
};
pub struct RedisAdapter {
publisher: Arc<Mutex<ConnectionManager>>,
subscriber: Arc<Client>,
connections: Arc<RwLock<HashMap<String, Arc<dyn StreamConnection>>>>,
}
impl RedisAdapter {
pub async fn new(redis_url: String) -> anyhow::Result<Self> {
let client = Client::open(redis_url.as_str())?;
let manager = client.get_connection_manager().await?;
Ok(Self {
publisher: Arc::new(Mutex::new(manager)),
subscriber: Arc::new(client),
connections: Arc::new(RwLock::new(HashMap::new())),
})
}
}
#[async_trait]
impl StreamAdapter for RedisAdapter {
async fn set(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
data: Value,
) -> anyhow::Result<SetResult> {
let mut conn = self.publisher.lock().await;
let key = format!("stream:{}:{}", stream_name, group_id);
let exists: bool = conn.hexists(&key, item_id).await?;
conn.hset(&key, item_id, data.to_string()).await?;
Ok(if exists { SetResult::Updated } else { SetResult::Created })
}
async fn get(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
) -> anyhow::Result<Option<Value>> {
let mut conn = self.publisher.lock().await;
let key = format!("stream:{}:{}", stream_name, group_id);
let result: Option<String> = conn.hget(&key, item_id).await?;
Ok(result.and_then(|s| serde_json::from_str(&s).ok()))
}
async fn delete(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
) -> anyhow::Result<DeleteResult> {
let mut conn = self.publisher.lock().await;
let key = format!("stream:{}:{}", stream_name, group_id);
let deleted: i32 = conn.hdel(&key, item_id).await?;
Ok(if deleted > 0 { DeleteResult::Deleted } else { DeleteResult::NotFound })
}
async fn get_group(
&self,
stream_name: &str,
group_id: &str,
) -> anyhow::Result<Vec<Value>> {
// Implementation...
Ok(vec![])
}
async fn list_groups(&self, stream_name: &str) -> anyhow::Result<Vec<String>> {
// Implementation...
Ok(vec![])
}
async fn list_all_stream(&self) -> anyhow::Result<Vec<StreamMetadata>> {
// Implementation...
Ok(vec![])
}
async fn emit_event(&self, message: StreamWrapperMessage) -> anyhow::Result<()> {
// Implementation...
Ok(())
}
async fn subscribe(
&self,
id: String,
connection: Arc<dyn StreamConnection>,
) -> anyhow::Result<()> {
let mut connections = self.connections.write().await;
connections.insert(id, connection);
Ok(())
}
async fn unsubscribe(&self, id: String) -> anyhow::Result<()> {
let mut connections = self.connections.write().await;
connections.remove(&id);
Ok(())
}
async fn watch_events(&self) -> anyhow::Result<()> {
// Implementation...
Ok(())
}
async fn destroy(&self) -> anyhow::Result<()> {
Ok(())
}
async fn update(
&self,
stream_name: &str,
group_id: &str,
item_id: &str,
ops: Vec<UpdateOp>,
) -> anyhow::Result<UpdateResult> {
// Implementation with atomic operations...
Ok(UpdateResult { value: Value::Null })
}
}
pub struct StreamMetadata {
pub id: String,
pub groups: Vec<String>,
}
Metadata describing a stream and its groups.
StreamWrapperMessage
pub struct StreamWrapperMessage {
pub event_type: String,
pub timestamp: i64,
pub stream_name: String,
pub group_id: String,
pub id: Option<String>,
pub event: StreamOutboundMessage,
}
Wrapper for stream events broadcast to WebSocket connections.
StreamConnection
#[async_trait]
pub trait StreamConnection: Subscriber + Send + Sync {
async fn cleanup(&self);
async fn handle_stream_message(&self, msg: &StreamWrapperMessage) -> anyhow::Result<()>;
}
Trait for WebSocket connection handlers that receive stream events.
UpdateOp
From the iii_sdk crate - represents atomic JSON operations:
- Set a field
- Delete a field
- Append to array
- Remove from array
- Increment/decrement numbers
Registration Example
use iii::modules::stream::StreamCoreModule;
// Register a custom stream adapter
StreamCoreModule::add_adapter("my_stream", |engine, config| async move {
let adapter = MyStreamAdapter::new(engine, config).await?;
Ok(Arc::new(adapter) as Arc<dyn StreamAdapter>)
}).await?;
See Also