Skip to main content

Overview

Brainbox uses WebSocket connections for real-time CRDT-based synchronization between clients and server. The protocol is built on synchronizers - bidirectional data streams that sync specific types of data.

Connection URL

ws://your-server.com/api/client/socket
Include JWT token in connection headers for authentication.

Message Format

All WebSocket messages are JSON-encoded with a type field for discrimination.

Message Types

Client → Server Messages

Synchronizer Input

Subscribe to a data stream with cursor-based pagination.
type SynchronizerInputMessage = {
  type: 'synchronizer.input';
  id: string;              // Unique synchronizer instance ID
  userId: string;          // User ID for this sync
  input: SynchronizerInput; // Type-specific input (see below)
  cursor: string;          // Revision cursor ("0" to start from beginning)
};
Example:
{
  "type": "synchronizer.input",
  "id": "sync_abc123",
  "userId": "usr_xyz789",
  "input": {
    "type": "nodes.updates",
    "rootId": "page_root_123"
  },
  "cursor": "0"
}

Server → Client Messages

Synchronizer Output

Data stream response with updates since the cursor.
type SynchronizerOutputMessage<TInput extends SynchronizerInput> = {
  type: 'synchronizer.output';
  userId: string;
  id: string;              // Matches input synchronizer ID
  items: {
    cursor: string;        // New cursor position
    data: any;             // Type-specific data
  }[];
};
Example:
{
  "type": "synchronizer.output",
  "userId": "usr_xyz789",
  "id": "sync_abc123",
  "items": [
    {
      "cursor": "42",
      "data": {
        "id": "upd_123",
        "nodeId": "page_abc",
        "revision": "42",
        "data": "base64EncodedCRDTState..."
      }
    }
  ]
}

Event Messages

Server broadcasts events for real-time updates.
type AccountUpdatedMessage = {
  type: 'account.updated';
  accountId: string;
};

type WorkspaceUpdatedMessage = {
  type: 'workspace.updated';
  workspaceId: string;
};

type WorkspaceDeletedMessage = {
  type: 'workspace.deleted';
  accountId: string;
};

type UserCreatedMessage = {
  type: 'user.created';
  accountId: string;
  workspaceId: string;
  userId: string;
};

type UserUpdatedMessage = {
  type: 'user.updated';
  accountId: string;
  userId: string;
};

Synchronizer Types

Node Updates

Syncs CRDT updates for node attributes (metadata, structure).

Input

type SyncNodesUpdatesInput = {
  type: 'nodes.updates';
  rootId: string;  // Root node ID to sync
};

Output Data

type SyncNodeUpdateData = {
  id: string;              // Update ID
  nodeId: string;          // Node being updated
  rootId: string;          // Root node ID
  workspaceId: string;     // Workspace ID
  revision: string;        // Monotonic revision number
  data: string;            // Base64-encoded Yjs state vector
  createdAt: string;       // ISO 8601 timestamp
  createdBy: string;       // User ID who created update
  mergedUpdates: UpdateMergeMetadata[] | null; // Merge metadata
};

Behavior

  • Fetches up to 20 updates per batch
  • Triggers on node.created and node.updated events
  • Orders by revision ascending

Document Updates

Syncs CRDT updates for document content (rich text, blocks).

Input

type SyncDocumentUpdatesInput = {
  type: 'document.updates';
  rootId: string;  // Root node ID to sync
};

Output Data

type SyncDocumentUpdateData = {
  id: string;              // Update ID
  documentId: string;      // Document being updated
  rootId: string;          // Root node ID
  workspaceId: string;     // Workspace ID
  revision: string;        // Monotonic revision number
  data: string;            // Base64-encoded Yjs state vector
  createdAt: string;       // ISO 8601 timestamp
  createdBy: string;       // User ID who created update
  mergedUpdates: UpdateMergeMetadata[] | null; // Merge metadata
};

Behavior

  • Fetches up to 20 updates per batch
  • Triggers on document.update.created events
  • Orders by revision ascending

Collaborations

Syncs node access permissions and collaborator roles.

Input

type SyncCollaborationsInput = {
  type: 'collaborations';
  // No additional parameters - syncs all user's collaborations
};

Output Data

type SyncCollaborationData = {
  collaboratorId: string;  // User ID
  nodeId: string;          // Node ID
  workspaceId: string;     // Workspace ID
  role: NodeRole;          // 'admin', 'editor', or 'viewer'
  createdAt: string;       // ISO 8601 timestamp
  createdBy: string;       // User who granted access
  updatedAt: string | null;
  updatedBy: string | null;
  deletedAt: string | null; // Soft delete timestamp
  deletedBy: string | null;
  revision: string;        // Monotonic revision number
};

Behavior

  • Fetches up to 50 collaborations per batch
  • Filters by collaborator_id = current user
  • Triggers on collaboration.created and collaboration.updated events
  • Includes soft-deleted collaborations (check deletedAt)

Users

Syncs workspace user list and profile updates.

Input

type SyncUsersInput = {
  type: 'users';
  // No additional parameters - syncs workspace users
};

Output Data

type SyncUserData = {
  id: string;              // User ID
  workspaceId: string;     // Workspace ID
  email: string;           // User email
  name: string;            // User name
  avatar: string | null;   // Avatar URL or ID
  role: WorkspaceRole;     // Workspace role
  customName: string | null;    // Custom display name
  customAvatar: string | null;  // Custom avatar
  createdAt: string;       // ISO 8601 timestamp
  updatedAt: string | null;
  revision: string;        // Monotonic revision number
  status: UserStatus;      // Active (1) or Removed (2)
};

Node Reactions

Syncs emoji reactions on nodes.

Input

type SyncNodeReactionsInput = {
  type: 'node.reactions';
  rootId: string;  // Root node ID to sync
};

Output Data

type SyncNodeReactionData = {
  nodeId: string;          // Node being reacted to
  collaboratorId: string;  // User who reacted
  reaction: string;        // Emoji or reaction identifier
  rootId: string;          // Root node ID
  workspaceId: string;     // Workspace ID
  revision: string;        // Monotonic revision number
  createdAt: string;       // ISO 8601 timestamp
  deletedAt: string | null; // Soft delete timestamp
};

Node Interactions

Syncs user interaction timestamps (seen, opened).

Input

type SyncNodeInteractionsInput = {
  type: 'node.interactions';
  rootId: string;  // Root node ID to sync
};

Output Data

type SyncNodeInteractionData = {
  nodeId: string;          // Node being interacted with
  collaboratorId: string;  // User interacting
  rootId: string;          // Root node ID
  workspaceId: string;     // Workspace ID
  revision: string;        // Monotonic revision number
  firstSeenAt: string | null;    // First view timestamp
  lastSeenAt: string | null;     // Most recent view
  firstOpenedAt: string | null;  // First open timestamp
  lastOpenedAt: string | null;   // Most recent open
};

Node Tombstones

Syncs deleted node markers for CRDT garbage collection.

Input

type SyncNodeTombstonesInput = {
  type: 'node.tombstones';
  rootId: string;  // Root node ID to sync
};

Output Data

type SyncNodeTombstoneData = {
  id: string;              // Tombstone ID (deleted node ID)
  rootId: string;          // Root node ID
  workspaceId: string;     // Workspace ID
  deletedBy: string;       // User who deleted
  deletedAt: string;       // ISO 8601 timestamp
  revision: string;        // Monotonic revision number
};

Implementation Details

Base Synchronizer Class

All synchronizers extend from BaseSynchronizer:
export abstract class BaseSynchronizer<T extends SynchronizerInput> {
  public readonly id: string;
  public readonly user: ConnectedUser;
  public readonly input: T;
  public readonly cursor: string;
  
  protected status: 'pending' | 'fetching';
  
  public abstract fetchData(): Promise<SynchronizerOutputMessage<T> | null>;
  public abstract fetchDataFromEvent(event: Event): Promise<SynchronizerOutputMessage<T> | null>;
}

Cursor-Based Pagination

  • Client sends last known cursor (revision)
  • Server returns items with revision > cursor
  • Client updates cursor to highest received revision
  • Next request uses new cursor for incremental sync

Event-Driven Updates

  • Server publishes internal events when data changes
  • Synchronizers listen for relevant events
  • Automatically fetch and broadcast updates to subscribed clients
  • Prevents duplicate fetches with fetching status flag

CRDT State Encoding

Node and document updates contain Yjs CRDT state:
import { encodeState } from '@brainbox/crdt';

const data = encodeState(nodeUpdate.data); // Base64 string
Clients decode and apply to local Yjs documents:
import { decodeState, applyUpdate } from '@brainbox/crdt';

const update = decodeState(data);
applyUpdate(doc, update);

Example Client Flow

1. Connect to WebSocket

const ws = new WebSocket('ws://server.com/api/client/socket', {
  headers: {
    Authorization: `Bearer ${token}`,
  },
});

2. Subscribe to Node Updates

ws.send(JSON.stringify({
  type: 'synchronizer.input',
  id: 'sync_nodes_123',
  userId: 'usr_abc',
  input: {
    type: 'nodes.updates',
    rootId: 'page_xyz',
  },
  cursor: '0', // Start from beginning
}));

3. Receive and Apply Updates

ws.on('message', (data) => {
  const msg = JSON.parse(data);
  
  if (msg.type === 'synchronizer.output') {
    for (const item of msg.items) {
      // Apply CRDT update
      const update = decodeState(item.data.data);
      applyUpdate(localDoc, update);
      
      // Update cursor for next sync
      cursor = item.cursor;
    }
  }
  
  if (msg.type === 'workspace.updated') {
    // Refetch workspace metadata
  }
});

4. Periodic Resync

// Re-send sync message with latest cursor to get new updates
setInterval(() => {
  ws.send(JSON.stringify({
    type: 'synchronizer.input',
    id: 'sync_nodes_123',
    userId: 'usr_abc',
    input: { type: 'nodes.updates', rootId: 'page_xyz' },
    cursor: cursor, // Use last received cursor
  }));
}, 5000);

Best Practices

Follow these guidelines for robust synchronization.

Client-Side

  • Persist cursors to local storage for offline resilience
  • Handle reconnection by resyncing with stored cursors
  • Deduplicate updates by tracking applied revision numbers
  • Apply updates transactionally to avoid partial states
  • Subscribe to multiple streams as needed (nodes, documents, collaborations)

Server-Side

  • Use monotonic revisions (auto-incrementing integers)
  • Index by revision for fast cursor-based queries
  • Batch updates (limit 20-50 per message)
  • Publish events immediately after DB writes
  • Prevent concurrent fetches with status flags

Error Handling

  • Reconnect on disconnect with exponential backoff
  • Validate message types before processing
  • Handle missing cursors by starting from 0
  • Log sync errors for debugging
  • Implement health checks (ping/pong)

Build docs developers (and LLMs) love