Overview
Brainbox uses WebSocket connections for real-time CRDT-based synchronization between clients and server. The protocol is built on synchronizers - bidirectional data streams that sync specific types of data.
Connection URL
ws://your-server.com/api/client/socket
Include JWT token in connection headers for authentication.
All WebSocket messages are JSON-encoded with a type field for discrimination.
Message Types
Client → Server Messages
Subscribe to a data stream with cursor-based pagination.
type SynchronizerInputMessage = {
type: 'synchronizer.input';
id: string; // Unique synchronizer instance ID
userId: string; // User ID for this sync
input: SynchronizerInput; // Type-specific input (see below)
cursor: string; // Revision cursor ("0" to start from beginning)
};
Example:
{
"type": "synchronizer.input",
"id": "sync_abc123",
"userId": "usr_xyz789",
"input": {
"type": "nodes.updates",
"rootId": "page_root_123"
},
"cursor": "0"
}
Server → Client Messages
Synchronizer Output
Data stream response with updates since the cursor.
type SynchronizerOutputMessage<TInput extends SynchronizerInput> = {
type: 'synchronizer.output';
userId: string;
id: string; // Matches input synchronizer ID
items: {
cursor: string; // New cursor position
data: any; // Type-specific data
}[];
};
Example:
{
"type": "synchronizer.output",
"userId": "usr_xyz789",
"id": "sync_abc123",
"items": [
{
"cursor": "42",
"data": {
"id": "upd_123",
"nodeId": "page_abc",
"revision": "42",
"data": "base64EncodedCRDTState..."
}
}
]
}
Event Messages
Server broadcasts events for real-time updates.
type AccountUpdatedMessage = {
type: 'account.updated';
accountId: string;
};
type WorkspaceUpdatedMessage = {
type: 'workspace.updated';
workspaceId: string;
};
type WorkspaceDeletedMessage = {
type: 'workspace.deleted';
accountId: string;
};
type UserCreatedMessage = {
type: 'user.created';
accountId: string;
workspaceId: string;
userId: string;
};
type UserUpdatedMessage = {
type: 'user.updated';
accountId: string;
userId: string;
};
Synchronizer Types
Node Updates
Syncs CRDT updates for node attributes (metadata, structure).
type SyncNodesUpdatesInput = {
type: 'nodes.updates';
rootId: string; // Root node ID to sync
};
Output Data
type SyncNodeUpdateData = {
id: string; // Update ID
nodeId: string; // Node being updated
rootId: string; // Root node ID
workspaceId: string; // Workspace ID
revision: string; // Monotonic revision number
data: string; // Base64-encoded Yjs state vector
createdAt: string; // ISO 8601 timestamp
createdBy: string; // User ID who created update
mergedUpdates: UpdateMergeMetadata[] | null; // Merge metadata
};
Behavior
- Fetches up to 20 updates per batch
- Triggers on
node.created and node.updated events
- Orders by revision ascending
Document Updates
Syncs CRDT updates for document content (rich text, blocks).
type SyncDocumentUpdatesInput = {
type: 'document.updates';
rootId: string; // Root node ID to sync
};
Output Data
type SyncDocumentUpdateData = {
id: string; // Update ID
documentId: string; // Document being updated
rootId: string; // Root node ID
workspaceId: string; // Workspace ID
revision: string; // Monotonic revision number
data: string; // Base64-encoded Yjs state vector
createdAt: string; // ISO 8601 timestamp
createdBy: string; // User ID who created update
mergedUpdates: UpdateMergeMetadata[] | null; // Merge metadata
};
Behavior
- Fetches up to 20 updates per batch
- Triggers on
document.update.created events
- Orders by revision ascending
Collaborations
Syncs node access permissions and collaborator roles.
type SyncCollaborationsInput = {
type: 'collaborations';
// No additional parameters - syncs all user's collaborations
};
Output Data
type SyncCollaborationData = {
collaboratorId: string; // User ID
nodeId: string; // Node ID
workspaceId: string; // Workspace ID
role: NodeRole; // 'admin', 'editor', or 'viewer'
createdAt: string; // ISO 8601 timestamp
createdBy: string; // User who granted access
updatedAt: string | null;
updatedBy: string | null;
deletedAt: string | null; // Soft delete timestamp
deletedBy: string | null;
revision: string; // Monotonic revision number
};
Behavior
- Fetches up to 50 collaborations per batch
- Filters by
collaborator_id = current user
- Triggers on
collaboration.created and collaboration.updated events
- Includes soft-deleted collaborations (check
deletedAt)
Users
Syncs workspace user list and profile updates.
type SyncUsersInput = {
type: 'users';
// No additional parameters - syncs workspace users
};
Output Data
type SyncUserData = {
id: string; // User ID
workspaceId: string; // Workspace ID
email: string; // User email
name: string; // User name
avatar: string | null; // Avatar URL or ID
role: WorkspaceRole; // Workspace role
customName: string | null; // Custom display name
customAvatar: string | null; // Custom avatar
createdAt: string; // ISO 8601 timestamp
updatedAt: string | null;
revision: string; // Monotonic revision number
status: UserStatus; // Active (1) or Removed (2)
};
Node Reactions
Syncs emoji reactions on nodes.
type SyncNodeReactionsInput = {
type: 'node.reactions';
rootId: string; // Root node ID to sync
};
Output Data
type SyncNodeReactionData = {
nodeId: string; // Node being reacted to
collaboratorId: string; // User who reacted
reaction: string; // Emoji or reaction identifier
rootId: string; // Root node ID
workspaceId: string; // Workspace ID
revision: string; // Monotonic revision number
createdAt: string; // ISO 8601 timestamp
deletedAt: string | null; // Soft delete timestamp
};
Node Interactions
Syncs user interaction timestamps (seen, opened).
type SyncNodeInteractionsInput = {
type: 'node.interactions';
rootId: string; // Root node ID to sync
};
Output Data
type SyncNodeInteractionData = {
nodeId: string; // Node being interacted with
collaboratorId: string; // User interacting
rootId: string; // Root node ID
workspaceId: string; // Workspace ID
revision: string; // Monotonic revision number
firstSeenAt: string | null; // First view timestamp
lastSeenAt: string | null; // Most recent view
firstOpenedAt: string | null; // First open timestamp
lastOpenedAt: string | null; // Most recent open
};
Node Tombstones
Syncs deleted node markers for CRDT garbage collection.
type SyncNodeTombstonesInput = {
type: 'node.tombstones';
rootId: string; // Root node ID to sync
};
Output Data
type SyncNodeTombstoneData = {
id: string; // Tombstone ID (deleted node ID)
rootId: string; // Root node ID
workspaceId: string; // Workspace ID
deletedBy: string; // User who deleted
deletedAt: string; // ISO 8601 timestamp
revision: string; // Monotonic revision number
};
Implementation Details
Base Synchronizer Class
All synchronizers extend from BaseSynchronizer:
export abstract class BaseSynchronizer<T extends SynchronizerInput> {
public readonly id: string;
public readonly user: ConnectedUser;
public readonly input: T;
public readonly cursor: string;
protected status: 'pending' | 'fetching';
public abstract fetchData(): Promise<SynchronizerOutputMessage<T> | null>;
public abstract fetchDataFromEvent(event: Event): Promise<SynchronizerOutputMessage<T> | null>;
}
- Client sends last known cursor (revision)
- Server returns items with
revision > cursor
- Client updates cursor to highest received revision
- Next request uses new cursor for incremental sync
Event-Driven Updates
- Server publishes internal events when data changes
- Synchronizers listen for relevant events
- Automatically fetch and broadcast updates to subscribed clients
- Prevents duplicate fetches with
fetching status flag
CRDT State Encoding
Node and document updates contain Yjs CRDT state:
import { encodeState } from '@brainbox/crdt';
const data = encodeState(nodeUpdate.data); // Base64 string
Clients decode and apply to local Yjs documents:
import { decodeState, applyUpdate } from '@brainbox/crdt';
const update = decodeState(data);
applyUpdate(doc, update);
Example Client Flow
1. Connect to WebSocket
const ws = new WebSocket('ws://server.com/api/client/socket', {
headers: {
Authorization: `Bearer ${token}`,
},
});
2. Subscribe to Node Updates
ws.send(JSON.stringify({
type: 'synchronizer.input',
id: 'sync_nodes_123',
userId: 'usr_abc',
input: {
type: 'nodes.updates',
rootId: 'page_xyz',
},
cursor: '0', // Start from beginning
}));
3. Receive and Apply Updates
ws.on('message', (data) => {
const msg = JSON.parse(data);
if (msg.type === 'synchronizer.output') {
for (const item of msg.items) {
// Apply CRDT update
const update = decodeState(item.data.data);
applyUpdate(localDoc, update);
// Update cursor for next sync
cursor = item.cursor;
}
}
if (msg.type === 'workspace.updated') {
// Refetch workspace metadata
}
});
4. Periodic Resync
// Re-send sync message with latest cursor to get new updates
setInterval(() => {
ws.send(JSON.stringify({
type: 'synchronizer.input',
id: 'sync_nodes_123',
userId: 'usr_abc',
input: { type: 'nodes.updates', rootId: 'page_xyz' },
cursor: cursor, // Use last received cursor
}));
}, 5000);
Best Practices
Follow these guidelines for robust synchronization.
Client-Side
- Persist cursors to local storage for offline resilience
- Handle reconnection by resyncing with stored cursors
- Deduplicate updates by tracking applied revision numbers
- Apply updates transactionally to avoid partial states
- Subscribe to multiple streams as needed (nodes, documents, collaborations)
Server-Side
- Use monotonic revisions (auto-incrementing integers)
- Index by revision for fast cursor-based queries
- Batch updates (limit 20-50 per message)
- Publish events immediately after DB writes
- Prevent concurrent fetches with status flags
Error Handling
- Reconnect on disconnect with exponential backoff
- Validate message types before processing
- Handle missing cursors by starting from 0
- Log sync errors for debugging
- Implement health checks (ping/pong)