Documentation Index
Fetch the complete documentation index at: https://mintlify.com/cgwire/zou/llms.txt
Use this file to discover all available pages before exploring further.
Zou includes a real-time event stream server using WebSockets for live updates. This powers features like collaborative playlist reviews, real-time notifications, and live data updates.
Overview
The event stream system consists of:
- WebSocket Server - Socket.IO server for real-time connections
- Redis Message Queue - Distributes events across multiple servers
- Event Handlers - Custom scripts triggered by events
Architecture
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Client │◄───────►│ WebSocket │◄───────►│ Redis │
│ (Browser) │ │ Server │ │ Message Queue│
└─────────────┘ │ (port 5001) │ └──────────────┘
└──────────────┘ │
│ │
└────────────────────────┘
Event Bus
┌─────────────────────────────────────────────────────────────────┐
│ Event Handlers (Custom Scripts) │
│ - Triggered on specific events │
│ - Located in EVENT_HANDLERS_FOLDER │
└─────────────────────────────────────────────────────────────────┘
Configuration
Event Stream Server
EVENT_STREAM_HOST
string
default:"localhost"
Host address for the WebSocket server.
localhost - Only accept local connections
0.0.0.0 - Accept connections from any network interface
- Specific IP - Bind to specific network interface
Port for the WebSocket server.Default: 5001 (main API typically runs on 5000)
EVENT_HANDLERS_FOLDER
string
default:"./event_handlers"
Folder containing custom event handler scripts.Handlers are Python scripts executed when specific events occur.
Redis Configuration
Redis is used as a message queue to distribute events across multiple Zou instances.
KV_HOST
string
default:"localhost"
Redis server host address.
Redis password for authentication (if required).
The event stream uses Redis database index 2 (defined as KV_EVENTS_DB_INDEX in config).
Redis Databases
Zou uses multiple Redis databases for different purposes:
# From zou/app/config.py
AUTH_TOKEN_BLACKLIST_KV_INDEX = 0 # Blacklisted JWT tokens
MEMOIZE_DB_INDEX = 1 # Cache/memoization
KV_EVENTS_DB_INDEX = 2 # Event stream messages
KV_JOB_DB_INDEX = 3 # Background job queue
Starting the Event Stream Server
Standalone Mode
python zou/event_stream.py
Or with custom host/port:
export EVENT_STREAM_HOST="0.0.0.0"
export EVENT_STREAM_PORT="5001"
python zou/event_stream.py
Production with Gunicorn
gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker \
-w 1 \
-b 0.0.0.0:5001 \
--timeout 3600 \
zou.event_stream:app
Only use 1 worker (-w 1) for event stream. Multiple workers require sticky sessions.
Docker Compose
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
zou-api:
image: cgwire/zou
ports:
- "5000:5000"
environment:
KV_HOST: redis
KV_PORT: 6379
depends_on:
- redis
zou-events:
image: cgwire/zou
command: python zou/event_stream.py
ports:
- "5001:5001"
environment:
EVENT_STREAM_HOST: 0.0.0.0
EVENT_STREAM_PORT: 5001
KV_HOST: redis
KV_PORT: 6379
depends_on:
- redis
volumes:
redis-data:
WebSocket Events
The event stream server emits various events. From zou/event_stream.py:587:
Connection Events
Connect to event stream:
const socket = io('http://localhost:5001/events', {
auth: {
token: 'your-jwt-token'
}
});
The server requires JWT authentication:
@socketio.on('connect', namespace='/events')
def connected(_):
try:
verify_jwt_in_request()
server_stats["nb_connections"] += 1
except Exception:
disconnect()
return False
Playlist Review Room Events
Open playlist:
socket.emit('preview-room:open-playlist', {
playlist_id: 'abc-123'
});
Join review room:
socket.emit('preview-room:join', {
playlist_id: 'abc-123',
user_id: 'user-456',
is_playing: false,
current_frame: 0
});
Leave review room:
socket.emit('preview-room:leave', {
playlist_id: 'abc-123'
});
Room updated:
socket.on('preview-room:room-updated', (data) => {
console.log('Room state:', data);
// data.people - List of users in room
// data.is_playing - Playback state
// data.current_frame - Current frame
// data.current_entity_id - Current entity
});
People updated:
socket.on('preview-room:room-people-updated', (data) => {
console.log('People in room:', data.people);
});
Annotation Events
Add annotation:
socket.emit('preview-room:add-annotation', {
playlist_id: 'abc-123',
annotation: { x: 100, y: 200, text: 'Fix this' }
});
Remove annotation:
socket.emit('preview-room:remove-annotation', {
playlist_id: 'abc-123',
annotation_id: 'annotation-789'
});
Update annotation:
socket.emit('preview-room:update-annotation', {
playlist_id: 'abc-123',
annotation_id: 'annotation-789',
annotation: { x: 150, y: 250 }
});
Playback Control Events
Room updated (playback state):
socket.emit('preview-room:room-updated', {
playlist_id: 'abc-123',
is_playing: true,
current_frame: 42,
speed: 1.0,
is_repeating: false
});
Change version:
socket.emit('preview-room:change-version', {
playlist_id: 'abc-123',
preview_file_id: 'preview-456'
});
Room State Management
The server maintains room state in memory:
rooms_data = {
"playlist-id": {
"playlist_id": "playlist-id",
"user_id": "user-who-created-room",
"people": ["user-1", "user-2"],
"is_playing": True,
"current_entity_id": "entity-id",
"current_preview_file_id": "preview-id",
"current_frame": 42,
"is_repeating": False,
"is_annotations_displayed": True,
"is_zoom_enabled": False,
"speed": 1.0,
"comparing": {
"enable": False,
"mode": "sidebyside"
}
}
}
Server Statistics
Get server stats:
curl http://localhost:5001/stats
Response:
Event Handlers
Create custom scripts that run when specific events occur.
Event Handler Structure
# event_handlers/on_task_created.py
def handle_event(data):
"""
Called when a task is created.
Args:
data: Event data dictionary
"""
task_id = data.get('task_id')
print(f"New task created: {task_id}")
# Custom logic here
# - Send notifications
# - Update external systems
# - Trigger workflows
Available Events
Common events from zou/app/services/events_service.py:587:
task:new - New task created
task:update - Task updated
task:assign - Task assigned
comment:new - New comment posted
preview-file:add-file - Preview file uploaded
person:update - Person updated
project:update - Project updated
Handler Naming Convention
Name handlers after events:
event_handlers/
├── on_task_new.py
├── on_task_update.py
├── on_comment_new.py
└── on_preview_file_add_file.py
Replace : with _ in event names.
Redis Configuration
Redis Connection URL
The connection URL is built from configuration in zou/app/utils/redis.py:587:
def get_redis_url(db_index):
redis_host = config.KEY_VALUE_STORE["host"]
redis_port = config.KEY_VALUE_STORE["port"]
if config.KEY_VALUE_STORE["password"]:
redis_password = f":{config.KEY_VALUE_STORE['password']}@"
else:
redis_password = ""
return f"redis://{redis_password}{redis_host}:{redis_port}/{db_index}"
Example URLs:
redis://localhost:6379/2
redis://:password@localhost:6379/2
Redis Setup
Install Redis:
# Ubuntu/Debian
sudo apt install redis-server
# macOS
brew install redis
# Docker
docker run -d --name redis -p 6379:6379 redis:7-alpine
Configure Redis (optional):
# /etc/redis/redis.conf
# Bind to all interfaces (default: localhost only)
bind 0.0.0.0
# Set password
requirepass your-secure-password
# Maximum memory
maxmemory 256mb
maxmemory-policy allkeys-lru
# Persistence (for cache, can disable)
save ""
appendonly no
Start Redis:
# Ubuntu/Debian
sudo systemctl start redis-server
sudo systemctl enable redis-server
# macOS
brew services start redis
# Docker
docker start redis
Clear Redis Cache
# Clear memory cache
zou clear-memory-cache
# Or directly with Redis
redis-cli -n 1 FLUSHDB # Clear cache database
redis-cli -n 2 FLUSHDB # Clear event stream database
Multiple Server Setup
For high availability, run multiple event stream servers with Redis:
┌─────────────┐ ┌──────────────┐
│ Client 1 │────►│ Event Server │
└─────────────┘ │ #1 │
└──────┬───────┘
┌─────────────┐ │
│ Client 2 │────►┌─────┴────────┐ ┌──────────────┐
└─────────────┘ │ Event Server │────►│ Redis │
│ #2 │ │ Message Queue│
┌─────────────┐ └─────┬────────┘ └──────────────┘
│ Client 3 │────► │
└─────────────┘ ┌─────┴────────┐
│ Event Server │
│ #3 │
└──────────────┘
Redis distributes events to all servers automatically.
Load Balancer Configuration
Nginx
upstream event_stream {
ip_hash; # Sticky sessions
server event1.example.com:5001;
server event2.example.com:5001;
server event3.example.com:5001;
}
server {
listen 443 ssl;
server_name events.example.com;
location /socket.io/ {
proxy_pass http://event_stream;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}
Use ip_hash or sticky sessions to ensure clients connect to the same server.
Monitoring
Connection Count
curl http://localhost:5001/stats
Redis Monitoring
# Monitor Redis commands
redis-cli -n 2 MONITOR
# Check memory usage
redis-cli INFO memory
# Check connection count
redis-cli CLIENT LIST
Logs
The event stream server logs connections and errors:
INFO:zou.event_stream:New websocket client connected
INFO:zou.event_stream:Websocket client disconnected
ERROR:zou.event_stream:Error in event handler
Troubleshooting
Clients Can’t Connect
Check server is running:
curl http://localhost:5001/
# Should return: {"name": "Zou Event stream"}
Check firewall:
Check host binding:
Ensure EVENT_STREAM_HOST=0.0.0.0 for remote connections.
Events Not Received
Check Redis connection:
redis-cli -h localhost -p 6379 PING
# Should return: PONG
Check JWT token:
Ensure clients send valid JWT token in connection auth.
Check rooms:
Users must join rooms to receive room-specific events.
High Memory Usage
Clear Redis cache:
Limit Redis memory:
# redis.conf
maxmemory 512mb
maxmemory-policy allkeys-lru
Disconnections
Increase timeouts:
# Nginx
proxy_read_timeout 7200s;
proxy_send_timeout 7200s;
# Gunicorn
gunicorn --timeout 7200 ...
Security
- JWT Authentication - All connections require valid JWT token
- CORS - Configure allowed origins
- SSL/TLS - Use HTTPS/WSS in production
- Redis Password - Set
KV_PASSWORD for Redis authentication
- Firewall - Restrict port 5001 to trusted networks
- Use Redis - Required for multi-server setups
- Single Worker - Event stream requires only 1 worker
- Gevent - Uses gevent for async I/O
- Sticky Sessions - Required for load balancing
- Monitor Connections - Watch connection count for capacity planning