Skip to main content
The Stream module provides real-time WebSocket connections for bidirectional communication and reactive data streams. Perfect for live updates, chat applications, and collaborative features.

Configuration

Configure the Stream module in config.yaml:
config.yaml
modules:
  - class: modules::stream::StreamModule
    config:
      port: 3112
      host: 127.0.0.1
      adapter:
        class: modules::stream::adapters::RedisAdapter
        config:
          redis_url: redis://localhost:6379

Configuration Options

port
number
default:"3112"
WebSocket server port
host
string
default:"127.0.0.1"
Bind address for the WebSocket server
auth_function
string
Function ID for WebSocket authentication

Available Adapters

Stream Concepts

Streams organize data in a three-level hierarchy:
stream_name/group_id/item_id
  • stream_name: Top-level namespace (e.g., ‘users’, ‘todos’)
  • group_id: Collection within stream (e.g., user ID, project ID)
  • item_id: Individual item (e.g., document ID)
Example:
todos/user_123/item_456

Stream Functions

Set (Create/Update)

Create or update an item in a stream:
await client.call('stream.set', {
  stream_name: 'todos',
  group_id: 'user_123',
  item_id: 'todo_1',
  data: {
    title: 'Buy groceries',
    completed: false,
  },
});

Get

Retrieve an item from a stream:
const todo = await client.call('stream.get', {
  stream_name: 'todos',
  group_id: 'user_123',
  item_id: 'todo_1',
});

Delete

Delete an item from a stream:
await client.call('stream.delete', {
  stream_name: 'todos',
  group_id: 'user_123',
  item_id: 'todo_1',
});

Update (Atomic)

Atomically update an item with multiple operations:
import { UpdateOp } from 'iii-sdk';

await client.call('stream.update', {
  stream_name: 'todos',
  group_id: 'user_123',
  item_id: 'todo_1',
  ops: [
    UpdateOp.set('completed', true),
    UpdateOp.increment('version', 1),
  ],
});

List Items

List all items in a group:
const items = await client.call('stream.list', {
  stream_name: 'todos',
  group_id: 'user_123',
});

List Groups

List all groups in a stream:
const groups = await client.call('stream.list_groups', {
  stream_name: 'todos',
});

Send Custom Event

Send a custom event to stream subscribers:
await client.call('stream.send', {
  stream_name: 'notifications',
  group_id: 'user_123',
  event_type: 'notification',
  data: {
    message: 'You have a new message',
    timestamp: Date.now(),
  },
});

Stream Triggers

React to stream events in real-time:
index.ts
export default iii({
  triggers: {
    'on-todo-change': {
      type: 'stream',
      config: {
        stream_name: 'todos',
        group_id: 'user_123',  // Optional: specific group
        item_id: 'todo_1',     // Optional: specific item
      },
    },
  },
});

export async function onTodoChange(event: any) {
  console.log('Todo changed:', event);
  // Event types: create, update, delete
  
  if (event.event_type === 'create') {
    console.log('New todo:', event.data);
  } else if (event.event_type === 'update') {
    console.log('Updated todo:', event.data);
  } else if (event.event_type === 'delete') {
    console.log('Deleted todo:', event.data);
  }
}

Event Types

Stream triggers receive events with this format:
{
  event_type: 'stream';        // Always 'stream'
  stream_name: string;         // Stream name
  group_id: string;            // Group ID
  id: string;                  // Item ID
  timestamp: number;           // Unix timestamp
  event: {
    // Event data based on operation:
    
    // Create event
    create?: { data: any };
    
    // Update event
    update?: { data: any };
    
    // Delete event
    delete?: { data: any };
    
    // Custom event
    event?: {
      event_type: string;
      data: any;
    };
  };
}

WebSocket Client (Browser)

Connect to streams from the browser:
import { Client } from 'iii-sdk';

const client = new Client('ws://localhost:3112');

// Subscribe to a stream
const subscription = client.stream.subscribe(
  'todos',      // stream_name
  'user_123',   // group_id
  (event) => {
    console.log('Stream event:', event);
    
    if (event.event.create) {
      console.log('Item created:', event.event.create.data);
    }
    if (event.event.update) {
      console.log('Item updated:', event.event.update.data);
    }
    if (event.event.delete) {
      console.log('Item deleted:', event.event.delete.data);
    }
  }
);

// Unsubscribe
subscription.unsubscribe();

Authentication

Implement WebSocket authentication:
config.yaml
modules:
  - class: modules::stream::StreamModule
    config:
      port: 3112
      auth_function: auth.validate_stream_connection
Auth function:
index.ts
export async function validateStreamConnection(input: any) {
  const token = input.headers.authorization;
  
  if (!token) {
    throw new Error('Unauthorized');
  }
  
  const user = await validateToken(token);
  
  // Return auth context
  return {
    user_id: user.id,
    permissions: user.permissions,
  };
}
Access auth context in triggers:
export async function onTodoChange(event: any, context: any) {
  console.log('User ID:', context.user_id);
  console.log('Permissions:', context.permissions);
}

Join/Leave Triggers

React to WebSocket connections:
export default iii({
  triggers: {
    'on-client-join': {
      type: 'stream_join',
      config: {
        stream_name: 'chat',
      },
    },
    'on-client-leave': {
      type: 'stream_leave',
      config: {
        stream_name: 'chat',
      },
    },
  },
});

export async function onClientJoin(event: any) {
  console.log('Client joined:', event.connection_id);
  // Notify other users
}

export async function onClientLeave(event: any) {
  console.log('Client left:', event.connection_id);
  // Cleanup resources
}

Example: Real-time Todo App

index.ts
export default iii({
  triggers: {
    'on-todo-change': {
      type: 'stream',
      config: { stream_name: 'todos' },
    },
    'create-todo-http': {
      type: 'http',
      config: { path: '/todos', method: 'POST' },
    },
  },
});

// HTTP endpoint to create todo
export async function createTodoHttp(input: any) {
  const { userId, title } = input.body;
  const todoId = Date.now().toString();
  
  // This will trigger stream subscribers
  await client.call('stream.set', {
    stream_name: 'todos',
    group_id: userId,
    item_id: todoId,
    data: { title, completed: false },
  });
  
  return { id: todoId };
}

// Stream trigger for real-time updates
export async function onTodoChange(event: any) {
  // Automatically pushed to WebSocket clients
  console.log('Todo stream event:', event);
  
  // Additional processing
  if (event.event.create) {
    await analytics.track('todo_created', event);
  }
}
Browser client:
const client = new Client('ws://localhost:3112');

// Subscribe to user's todos
client.stream.subscribe('todos', 'user_123', (event) => {
  // Real-time updates
  if (event.event.create) {
    addTodoToUI(event.event.create.data);
  }
  if (event.event.update) {
    updateTodoInUI(event.id, event.event.update.data);
  }
  if (event.event.delete) {
    removeTodoFromUI(event.id);
  }
});

// Create todo via HTTP
await fetch('http://localhost:3111/todos', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    userId: 'user_123',
    title: 'New todo',
  }),
});
// WebSocket subscribers receive update immediately

Performance Tips

  • Use Redis adapter for production/distributed systems
  • Filter triggers by group_id or item_id for better performance
  • Use stream.send for lightweight notifications
  • Implement authentication to prevent unauthorized access

Source Code Reference

  • Module: src/modules/stream/stream.rs:56
  • Stream functions: src/modules/stream/stream.rs:380
  • Trigger invocation: src/modules/stream/stream.rs:249
  • WebSocket handler: src/modules/stream/socket.rs

Build docs developers (and LLMs) love