Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/cgwire/zou/llms.txt

Use this file to discover all available pages before exploring further.

Zou includes a real-time event stream server using WebSockets for live updates. This powers features like collaborative playlist reviews, real-time notifications, and live data updates.

Overview

The event stream system consists of:
  1. WebSocket Server - Socket.IO server for real-time connections
  2. Redis Message Queue - Distributes events across multiple servers
  3. Event Handlers - Custom scripts triggered by events

Architecture

┌─────────────┐         ┌──────────────┐         ┌──────────────┐
│   Client    │◄───────►│  WebSocket   │◄───────►│    Redis     │
│  (Browser)  │         │    Server    │         │ Message Queue│
└─────────────┘         │  (port 5001) │         └──────────────┘
                        └──────────────┘                │
                               │                        │
                               └────────────────────────┘
                                     Event Bus

┌─────────────────────────────────────────────────────────────────┐
│  Event Handlers (Custom Scripts)                                │
│  - Triggered on specific events                                 │
│  - Located in EVENT_HANDLERS_FOLDER                             │
└─────────────────────────────────────────────────────────────────┘

Configuration

Event Stream Server

EVENT_STREAM_HOST
string
default:"localhost"
Host address for the WebSocket server.
  • localhost - Only accept local connections
  • 0.0.0.0 - Accept connections from any network interface
  • Specific IP - Bind to specific network interface
EVENT_STREAM_PORT
string
default:"5001"
Port for the WebSocket server.Default: 5001 (main API typically runs on 5000)
EVENT_HANDLERS_FOLDER
string
default:"./event_handlers"
Folder containing custom event handler scripts.Handlers are Python scripts executed when specific events occur.

Redis Configuration

Redis is used as a message queue to distribute events across multiple Zou instances.
KV_HOST
string
default:"localhost"
Redis server host address.
KV_PORT
string
default:"6379"
Redis server port.
KV_PASSWORD
string
default:"None"
Redis password for authentication (if required).
The event stream uses Redis database index 2 (defined as KV_EVENTS_DB_INDEX in config).

Redis Databases

Zou uses multiple Redis databases for different purposes:
# From zou/app/config.py
AUTH_TOKEN_BLACKLIST_KV_INDEX = 0  # Blacklisted JWT tokens
MEMOIZE_DB_INDEX = 1               # Cache/memoization
KV_EVENTS_DB_INDEX = 2             # Event stream messages
KV_JOB_DB_INDEX = 3                # Background job queue

Starting the Event Stream Server

Standalone Mode

python zou/event_stream.py
Or with custom host/port:
export EVENT_STREAM_HOST="0.0.0.0"
export EVENT_STREAM_PORT="5001"
python zou/event_stream.py

Production with Gunicorn

gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker \
  -w 1 \
  -b 0.0.0.0:5001 \
  --timeout 3600 \
  zou.event_stream:app
Only use 1 worker (-w 1) for event stream. Multiple workers require sticky sessions.

Docker Compose

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  zou-api:
    image: cgwire/zou
    ports:
      - "5000:5000"
    environment:
      KV_HOST: redis
      KV_PORT: 6379
    depends_on:
      - redis

  zou-events:
    image: cgwire/zou
    command: python zou/event_stream.py
    ports:
      - "5001:5001"
    environment:
      EVENT_STREAM_HOST: 0.0.0.0
      EVENT_STREAM_PORT: 5001
      KV_HOST: redis
      KV_PORT: 6379
    depends_on:
      - redis

volumes:
  redis-data:

WebSocket Events

The event stream server emits various events. From zou/event_stream.py:587:

Connection Events

Connect to event stream:
const socket = io('http://localhost:5001/events', {
  auth: {
    token: 'your-jwt-token'
  }
});
The server requires JWT authentication:
@socketio.on('connect', namespace='/events')
def connected(_):
    try:
        verify_jwt_in_request()
        server_stats["nb_connections"] += 1
    except Exception:
        disconnect()
        return False

Playlist Review Room Events

Open playlist:
socket.emit('preview-room:open-playlist', {
  playlist_id: 'abc-123'
});
Join review room:
socket.emit('preview-room:join', {
  playlist_id: 'abc-123',
  user_id: 'user-456',
  is_playing: false,
  current_frame: 0
});
Leave review room:
socket.emit('preview-room:leave', {
  playlist_id: 'abc-123'
});
Room updated:
socket.on('preview-room:room-updated', (data) => {
  console.log('Room state:', data);
  // data.people - List of users in room
  // data.is_playing - Playback state
  // data.current_frame - Current frame
  // data.current_entity_id - Current entity
});
People updated:
socket.on('preview-room:room-people-updated', (data) => {
  console.log('People in room:', data.people);
});

Annotation Events

Add annotation:
socket.emit('preview-room:add-annotation', {
  playlist_id: 'abc-123',
  annotation: { x: 100, y: 200, text: 'Fix this' }
});
Remove annotation:
socket.emit('preview-room:remove-annotation', {
  playlist_id: 'abc-123',
  annotation_id: 'annotation-789'
});
Update annotation:
socket.emit('preview-room:update-annotation', {
  playlist_id: 'abc-123',
  annotation_id: 'annotation-789',
  annotation: { x: 150, y: 250 }
});

Playback Control Events

Room updated (playback state):
socket.emit('preview-room:room-updated', {
  playlist_id: 'abc-123',
  is_playing: true,
  current_frame: 42,
  speed: 1.0,
  is_repeating: false
});
Change version:
socket.emit('preview-room:change-version', {
  playlist_id: 'abc-123',
  preview_file_id: 'preview-456'
});

Room State Management

The server maintains room state in memory:
rooms_data = {
  "playlist-id": {
    "playlist_id": "playlist-id",
    "user_id": "user-who-created-room",
    "people": ["user-1", "user-2"],
    "is_playing": True,
    "current_entity_id": "entity-id",
    "current_preview_file_id": "preview-id",
    "current_frame": 42,
    "is_repeating": False,
    "is_annotations_displayed": True,
    "is_zoom_enabled": False,
    "speed": 1.0,
    "comparing": {
      "enable": False,
      "mode": "sidebyside"
    }
  }
}

Server Statistics

Get server stats:
curl http://localhost:5001/stats
Response:
{
  "nb_connections": 42
}

Event Handlers

Create custom scripts that run when specific events occur.

Event Handler Structure

# event_handlers/on_task_created.py

def handle_event(data):
    """
    Called when a task is created.
    
    Args:
        data: Event data dictionary
    """
    task_id = data.get('task_id')
    print(f"New task created: {task_id}")
    
    # Custom logic here
    # - Send notifications
    # - Update external systems
    # - Trigger workflows

Available Events

Common events from zou/app/services/events_service.py:587:
  • task:new - New task created
  • task:update - Task updated
  • task:assign - Task assigned
  • comment:new - New comment posted
  • preview-file:add-file - Preview file uploaded
  • person:update - Person updated
  • project:update - Project updated

Handler Naming Convention

Name handlers after events:
event_handlers/
├── on_task_new.py
├── on_task_update.py
├── on_comment_new.py
└── on_preview_file_add_file.py
Replace : with _ in event names.

Redis Configuration

Redis Connection URL

The connection URL is built from configuration in zou/app/utils/redis.py:587:
def get_redis_url(db_index):
    redis_host = config.KEY_VALUE_STORE["host"]
    redis_port = config.KEY_VALUE_STORE["port"]
    if config.KEY_VALUE_STORE["password"]:
        redis_password = f":{config.KEY_VALUE_STORE['password']}@"
    else:
        redis_password = ""
    return f"redis://{redis_password}{redis_host}:{redis_port}/{db_index}"
Example URLs:
redis://localhost:6379/2
redis://:password@localhost:6379/2

Redis Setup

Install Redis:
# Ubuntu/Debian
sudo apt install redis-server

# macOS
brew install redis

# Docker
docker run -d --name redis -p 6379:6379 redis:7-alpine
Configure Redis (optional):
# /etc/redis/redis.conf

# Bind to all interfaces (default: localhost only)
bind 0.0.0.0

# Set password
requirepass your-secure-password

# Maximum memory
maxmemory 256mb
maxmemory-policy allkeys-lru

# Persistence (for cache, can disable)
save ""
appendonly no
Start Redis:
# Ubuntu/Debian
sudo systemctl start redis-server
sudo systemctl enable redis-server

# macOS
brew services start redis

# Docker
docker start redis

Clear Redis Cache

# Clear memory cache
zou clear-memory-cache

# Or directly with Redis
redis-cli -n 1 FLUSHDB  # Clear cache database
redis-cli -n 2 FLUSHDB  # Clear event stream database

Multiple Server Setup

For high availability, run multiple event stream servers with Redis:
┌─────────────┐     ┌──────────────┐
│   Client 1  │────►│ Event Server │
└─────────────┘     │      #1      │
                    └──────┬───────┘
┌─────────────┐           │
│   Client 2  │────►┌─────┴────────┐     ┌──────────────┐
└─────────────┘     │ Event Server │────►│    Redis     │
                    │      #2      │     │ Message Queue│
┌─────────────┐     └─────┬────────┘     └──────────────┘
│   Client 3  │────►      │
└─────────────┘     ┌─────┴────────┐
                    │ Event Server │
                    │      #3      │
                    └──────────────┘
Redis distributes events to all servers automatically.

Load Balancer Configuration

Nginx

upstream event_stream {
    ip_hash;  # Sticky sessions
    server event1.example.com:5001;
    server event2.example.com:5001;
    server event3.example.com:5001;
}

server {
    listen 443 ssl;
    server_name events.example.com;

    location /socket.io/ {
        proxy_pass http://event_stream;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
    }
}
Use ip_hash or sticky sessions to ensure clients connect to the same server.

Monitoring

Connection Count

curl http://localhost:5001/stats

Redis Monitoring

# Monitor Redis commands
redis-cli -n 2 MONITOR

# Check memory usage
redis-cli INFO memory

# Check connection count
redis-cli CLIENT LIST

Logs

The event stream server logs connections and errors:
INFO:zou.event_stream:New websocket client connected
INFO:zou.event_stream:Websocket client disconnected
ERROR:zou.event_stream:Error in event handler

Troubleshooting

Clients Can’t Connect

Check server is running:
curl http://localhost:5001/
# Should return: {"name": "Zou Event stream"}
Check firewall:
sudo ufw allow 5001
Check host binding: Ensure EVENT_STREAM_HOST=0.0.0.0 for remote connections.

Events Not Received

Check Redis connection:
redis-cli -h localhost -p 6379 PING
# Should return: PONG
Check JWT token: Ensure clients send valid JWT token in connection auth. Check rooms: Users must join rooms to receive room-specific events.

High Memory Usage

Clear Redis cache:
zou clear-memory-cache
Limit Redis memory:
# redis.conf
maxmemory 512mb
maxmemory-policy allkeys-lru

Disconnections

Increase timeouts:
# Nginx
proxy_read_timeout 7200s;
proxy_send_timeout 7200s;
# Gunicorn
gunicorn --timeout 7200 ...

Security

  1. JWT Authentication - All connections require valid JWT token
  2. CORS - Configure allowed origins
  3. SSL/TLS - Use HTTPS/WSS in production
  4. Redis Password - Set KV_PASSWORD for Redis authentication
  5. Firewall - Restrict port 5001 to trusted networks

Performance Tips

  1. Use Redis - Required for multi-server setups
  2. Single Worker - Event stream requires only 1 worker
  3. Gevent - Uses gevent for async I/O
  4. Sticky Sessions - Required for load balancing
  5. Monitor Connections - Watch connection count for capacity planning

Build docs developers (and LLMs) love