Overview
Framefox provides comprehensive WebSocket support for building real-time applications. The WebSocket system includes connection management, room-based messaging, and user session tracking.WebSocket Features
- Connection Management: Handle multiple concurrent WebSocket connections
- Room System: Organize connections into logical groups
- User Sessions: Track multiple connections per user
- Broadcasting: Send messages to rooms or specific users
- Event Handlers: Custom logic for connection lifecycle
WebSocketManager
TheWebSocketManager is a singleton that manages all WebSocket connections:
from framefox.core.websocket.web_socket_manager import WebSocketManager
# Get the manager instance
manager = WebSocketManager()
# Connect a client
await manager.connect(
websocket=websocket,
connection_id="conn_123",
user_id="user_456",
room="chat_room"
)
# Send a message
await manager.send_to_connection(
"conn_123",
{"type": "notification", "message": "Hello!"}
)
# Broadcast to a room
await manager.broadcast_to_room(
"chat_room",
{"type": "message", "text": "New message!"}
)
# Disconnect
await manager.disconnect("conn_123")
Basic WebSocket Route
Create a WebSocket endpoint:from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from framefox.core.websocket.web_socket_manager import WebSocketManager
import uuid
app = FastAPI()
manager = WebSocketManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
connection_id = str(uuid.uuid4())
# Accept connection
await manager.connect(websocket, connection_id)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Echo back
await manager.send_to_connection(
connection_id,
{"type": "echo", "data": data}
)
except WebSocketDisconnect:
await manager.disconnect(connection_id)
Connection Management
Connecting Clients
from framefox.core.websocket.web_socket_manager import WebSocketManager
import uuid
manager = WebSocketManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
connection_id = str(uuid.uuid4())
# Connect with user ID and optional room
success = await manager.connect(
websocket=websocket,
connection_id=connection_id,
user_id=user_id,
room="general"
)
if not success:
await websocket.close(code=1008, reason="Connection failed")
return
try:
# Handle messages...
while True:
data = await websocket.receive_text()
# Process data...
except WebSocketDisconnect:
await manager.disconnect(connection_id)
Multiple Connections Per User
Users can have multiple simultaneous connections (e.g., multiple browser tabs):# Send to all connections for a user
await manager.send_to_user(
user_id="user_456",
message={"type": "notification", "text": "You have a new message"}
)
Room-Based Messaging
Joining Rooms
# Join a room after connection
await manager.join_room(connection_id, "tech_chat")
# Join multiple rooms
await manager.join_room(connection_id, "general")
await manager.join_room(connection_id, "announcements")
Leaving Rooms
# Leave a specific room
await manager.leave_room(connection_id, "tech_chat")
Broadcasting to Rooms
# Broadcast to all members of a room
count = await manager.broadcast_to_room(
room="chat_room",
message={
"type": "chat",
"user": "Alice",
"text": "Hello everyone!"
}
)
print(f"Message sent to {count} connections")
# Broadcast excluding sender
await manager.broadcast_to_room(
room="chat_room",
message={"type": "user_typing", "user": "Bob"},
exclude=sender_connection_id
)
Event Handlers
Framefox provides event handler classes for WebSocket events:Echo Handler
Simple echo functionality:from framefox.core.websocket.handlers.web_socket_handler import EchoHandler
from framefox.core.websocket.web_socket_manager import WebSocketManager
handler = EchoHandler()
manager = WebSocketManager()
@app.websocket("/ws/echo")
async def echo_endpoint(websocket: WebSocket):
connection_id = str(uuid.uuid4())
await manager.connect(websocket, connection_id)
await handler.on_connect(connection_id)
try:
while True:
message = await websocket.receive_text()
await handler.on_message(connection_id, message)
except WebSocketDisconnect:
await handler.on_disconnect(connection_id)
await manager.disconnect(connection_id)
Broadcast Handler
Room-based broadcasting:from framefox.core.websocket.handlers.web_socket_handler import BroadcastHandler
handler = BroadcastHandler()
manager = WebSocketManager()
@app.websocket("/ws/chat/{room}")
async def chat_endpoint(websocket: WebSocket, room: str, user_id: str = None):
connection_id = str(uuid.uuid4())
await manager.connect(websocket, connection_id, user_id, room)
await handler.on_connect(connection_id, user_id, room)
try:
while True:
message = await websocket.receive_text()
await handler.on_message(connection_id, message)
except WebSocketDisconnect:
await handler.on_disconnect(connection_id)
await manager.disconnect(connection_id)
Custom Handler
Create your own event handler:from framefox.core.websocket.handlers.web_socket_handler import WebSocketEventHandler
from framefox.core.websocket.web_socket_manager import WebSocketManager
import json
class GameHandler(WebSocketEventHandler):
"""Handle real-time game events"""
async def on_connect(self, connection_id: str, user_id: str = None, room: str = None):
manager = WebSocketManager()
# Send welcome message
await manager.send_to_connection(
connection_id,
{
"type": "welcome",
"message": f"Welcome to game room {room}!"
}
)
# Notify others
if room:
await manager.broadcast_to_room(
room,
{"type": "player_joined", "user_id": user_id},
exclude=connection_id
)
async def on_disconnect(self, connection_id: str):
manager = WebSocketManager()
# Get room info before disconnect
metadata = manager.connection_metadata.get(connection_id, {})
rooms = metadata.get("rooms", set())
user_id = metadata.get("user_id")
# Notify rooms
for room in rooms:
await manager.broadcast_to_room(
room,
{"type": "player_left", "user_id": user_id}
)
async def on_message(self, connection_id: str, raw_message: str):
manager = WebSocketManager()
try:
message = json.loads(raw_message)
message_type = message.get("type")
if message_type == "move":
# Broadcast player move to room
metadata = manager.connection_metadata.get(connection_id, {})
rooms = metadata.get("rooms", set())
for room in rooms:
await manager.broadcast_to_room(
room,
{
"type": "player_move",
"user_id": metadata.get("user_id"),
"position": message.get("position")
},
exclude=connection_id
)
elif message_type == "chat":
# Handle chat messages
metadata = manager.connection_metadata.get(connection_id, {})
rooms = metadata.get("rooms", set())
for room in rooms:
await manager.broadcast_to_room(
room,
{
"type": "chat",
"user_id": metadata.get("user_id"),
"text": message.get("text")
}
)
except json.JSONDecodeError:
# Handle invalid JSON
await manager.send_to_connection(
connection_id,
{"type": "error", "message": "Invalid message format"}
)
WebSocket Messages
Use theWebSocketMessage dataclass for structured messages:
from framefox.core.websocket.web_socket_connection import WebSocketMessage
from datetime import datetime
# Create a message
message = WebSocketMessage(
type="notification",
data={"title": "New Alert", "body": "You have an update"},
sender_id="user_123",
room="notifications",
timestamp=datetime.now()
)
# Convert to dict for sending
await manager.send_to_connection(
connection_id,
message.to_dict()
)
Statistics and Monitoring
Get connection statistics:stats = manager.get_stats()
print(f"Total connections: {stats['total_connections']}")
print(f"Total users: {stats['total_users']}")
print(f"Total rooms: {stats['total_rooms']}")
# Room details
for room, count in stats['rooms'].items():
print(f"Room '{room}': {count} connections")
Complete Chat Example
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from framefox.core.websocket.web_socket_manager import WebSocketManager
import uuid
import json
app = FastAPI()
manager = WebSocketManager()
@app.websocket("/ws/chat/{room}/{username}")
async def chat_endpoint(websocket: WebSocket, room: str, username: str):
connection_id = str(uuid.uuid4())
# Connect to room
await manager.connect(websocket, connection_id, username, room)
# Send welcome message
await manager.send_to_connection(
connection_id,
{
"type": "system",
"message": f"Welcome to {room}, {username}!"
}
)
# Notify room
await manager.broadcast_to_room(
room,
{
"type": "user_joined",
"username": username,
"message": f"{username} joined the room"
},
exclude=connection_id
)
try:
while True:
# Receive message
data = await websocket.receive_text()
message = json.loads(data)
# Broadcast to room
await manager.broadcast_to_room(
room,
{
"type": "message",
"username": username,
"text": message.get("text"),
"timestamp": datetime.now().isoformat()
}
)
except WebSocketDisconnect:
# Disconnect
await manager.disconnect(connection_id)
# Notify room
await manager.broadcast_to_room(
room,
{
"type": "user_left",
"username": username,
"message": f"{username} left the room"
}
)
Client-Side Example
// Connect to WebSocket
const ws = new WebSocket('ws://localhost:8000/ws/chat/general/Alice');
// Handle connection
ws.onopen = () => {
console.log('Connected to chat');
};
// Handle messages
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
switch(message.type) {
case 'system':
console.log('[System]', message.message);
break;
case 'message':
console.log(`[${message.username}]`, message.text);
break;
case 'user_joined':
console.log(message.message);
break;
case 'user_left':
console.log(message.message);
break;
}
};
// Send message
function sendMessage(text) {
ws.send(JSON.stringify({
type: 'message',
text: text
}));
}
// Handle disconnect
ws.onclose = () => {
console.log('Disconnected from chat');
};
Best Practices
- Connection IDs: Use UUIDs for unique connection identifiers
- Error Handling: Always handle WebSocketDisconnect exceptions
- Message Format: Use consistent JSON message structures
- Resource Cleanup: Ensure connections are properly closed
- Rate Limiting: Implement rate limiting for message sending
- Authentication: Verify users before establishing connections
- Monitoring: Track connection statistics and health