The Stream module provides real-time WebSocket connections for bidirectional communication and reactive data streams. Perfect for live updates, chat applications, and collaborative features.
Configuration
Configure the Stream module in config.yaml:
modules:
- class: modules::stream::StreamModule
config:
port: 3112
host: 127.0.0.1
adapter:
class: modules::stream::adapters::RedisAdapter
config:
redis_url: redis://localhost:6379
Configuration Options
host
string
default:"127.0.0.1"
Bind address for the WebSocket server
Function ID for WebSocket authentication
Available Adapters
Redis (Recommended)
KvStore (Development)
Production-ready distributed streams:adapter:
class: modules::stream::adapters::RedisAdapter
config:
redis_url: redis://localhost:6379
File-based or in-memory storage:adapter:
class: modules::stream::adapters::KvStore
config:
store_method: file_based
file_path: ./data/stream_store
Stream Concepts
Streams organize data in a three-level hierarchy:
stream_name/group_id/item_id
- stream_name: Top-level namespace (e.g., ‘users’, ‘todos’)
- group_id: Collection within stream (e.g., user ID, project ID)
- item_id: Individual item (e.g., document ID)
Example:
Stream Functions
Set (Create/Update)
Create or update an item in a stream:
await client.call('stream.set', {
stream_name: 'todos',
group_id: 'user_123',
item_id: 'todo_1',
data: {
title: 'Buy groceries',
completed: false,
},
});
Get
Retrieve an item from a stream:
const todo = await client.call('stream.get', {
stream_name: 'todos',
group_id: 'user_123',
item_id: 'todo_1',
});
Delete
Delete an item from a stream:
await client.call('stream.delete', {
stream_name: 'todos',
group_id: 'user_123',
item_id: 'todo_1',
});
Update (Atomic)
Atomically update an item with multiple operations:
import { UpdateOp } from 'iii-sdk';
await client.call('stream.update', {
stream_name: 'todos',
group_id: 'user_123',
item_id: 'todo_1',
ops: [
UpdateOp.set('completed', true),
UpdateOp.increment('version', 1),
],
});
List Items
List all items in a group:
const items = await client.call('stream.list', {
stream_name: 'todos',
group_id: 'user_123',
});
List Groups
List all groups in a stream:
const groups = await client.call('stream.list_groups', {
stream_name: 'todos',
});
Send Custom Event
Send a custom event to stream subscribers:
await client.call('stream.send', {
stream_name: 'notifications',
group_id: 'user_123',
event_type: 'notification',
data: {
message: 'You have a new message',
timestamp: Date.now(),
},
});
Stream Triggers
React to stream events in real-time:
export default iii({
triggers: {
'on-todo-change': {
type: 'stream',
config: {
stream_name: 'todos',
group_id: 'user_123', // Optional: specific group
item_id: 'todo_1', // Optional: specific item
},
},
},
});
export async function onTodoChange(event: any) {
console.log('Todo changed:', event);
// Event types: create, update, delete
if (event.event_type === 'create') {
console.log('New todo:', event.data);
} else if (event.event_type === 'update') {
console.log('Updated todo:', event.data);
} else if (event.event_type === 'delete') {
console.log('Deleted todo:', event.data);
}
}
Event Types
Stream triggers receive events with this format:
{
event_type: 'stream'; // Always 'stream'
stream_name: string; // Stream name
group_id: string; // Group ID
id: string; // Item ID
timestamp: number; // Unix timestamp
event: {
// Event data based on operation:
// Create event
create?: { data: any };
// Update event
update?: { data: any };
// Delete event
delete?: { data: any };
// Custom event
event?: {
event_type: string;
data: any;
};
};
}
WebSocket Client (Browser)
Connect to streams from the browser:
import { Client } from 'iii-sdk';
const client = new Client('ws://localhost:3112');
// Subscribe to a stream
const subscription = client.stream.subscribe(
'todos', // stream_name
'user_123', // group_id
(event) => {
console.log('Stream event:', event);
if (event.event.create) {
console.log('Item created:', event.event.create.data);
}
if (event.event.update) {
console.log('Item updated:', event.event.update.data);
}
if (event.event.delete) {
console.log('Item deleted:', event.event.delete.data);
}
}
);
// Unsubscribe
subscription.unsubscribe();
Authentication
Implement WebSocket authentication:
modules:
- class: modules::stream::StreamModule
config:
port: 3112
auth_function: auth.validate_stream_connection
Auth function:
export async function validateStreamConnection(input: any) {
const token = input.headers.authorization;
if (!token) {
throw new Error('Unauthorized');
}
const user = await validateToken(token);
// Return auth context
return {
user_id: user.id,
permissions: user.permissions,
};
}
Access auth context in triggers:
export async function onTodoChange(event: any, context: any) {
console.log('User ID:', context.user_id);
console.log('Permissions:', context.permissions);
}
Join/Leave Triggers
React to WebSocket connections:
export default iii({
triggers: {
'on-client-join': {
type: 'stream_join',
config: {
stream_name: 'chat',
},
},
'on-client-leave': {
type: 'stream_leave',
config: {
stream_name: 'chat',
},
},
},
});
export async function onClientJoin(event: any) {
console.log('Client joined:', event.connection_id);
// Notify other users
}
export async function onClientLeave(event: any) {
console.log('Client left:', event.connection_id);
// Cleanup resources
}
Example: Real-time Todo App
export default iii({
triggers: {
'on-todo-change': {
type: 'stream',
config: { stream_name: 'todos' },
},
'create-todo-http': {
type: 'http',
config: { path: '/todos', method: 'POST' },
},
},
});
// HTTP endpoint to create todo
export async function createTodoHttp(input: any) {
const { userId, title } = input.body;
const todoId = Date.now().toString();
// This will trigger stream subscribers
await client.call('stream.set', {
stream_name: 'todos',
group_id: userId,
item_id: todoId,
data: { title, completed: false },
});
return { id: todoId };
}
// Stream trigger for real-time updates
export async function onTodoChange(event: any) {
// Automatically pushed to WebSocket clients
console.log('Todo stream event:', event);
// Additional processing
if (event.event.create) {
await analytics.track('todo_created', event);
}
}
Browser client:
const client = new Client('ws://localhost:3112');
// Subscribe to user's todos
client.stream.subscribe('todos', 'user_123', (event) => {
// Real-time updates
if (event.event.create) {
addTodoToUI(event.event.create.data);
}
if (event.event.update) {
updateTodoInUI(event.id, event.event.update.data);
}
if (event.event.delete) {
removeTodoFromUI(event.id);
}
});
// Create todo via HTTP
await fetch('http://localhost:3111/todos', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
userId: 'user_123',
title: 'New todo',
}),
});
// WebSocket subscribers receive update immediately
- Use Redis adapter for production/distributed systems
- Filter triggers by
group_id or item_id for better performance
- Use
stream.send for lightweight notifications
- Implement authentication to prevent unauthorized access
Source Code Reference
- Module:
src/modules/stream/stream.rs:56
- Stream functions:
src/modules/stream/stream.rs:380
- Trigger invocation:
src/modules/stream/stream.rs:249
- WebSocket handler:
src/modules/stream/socket.rs