Skip to main content
Brainbox uses a custom sync engine to keep client SQLite databases in sync with the server PostgreSQL database. Sync handlers define what data gets synchronized and how.

Overview

The sync system has three parts:
  1. Schema (packages/core/src/synchronizers/) - Defines input/output types
  2. Client handler (packages/client/src/handlers/) - Receives and processes sync data
  3. Server synchronizer (apps/server/src/synchronizers/) - Fetches and broadcasts data

Architecture

Sync flow:
Client SQLite → WebSocket → Server → PostgreSQL
     ↑                              ↓
     └──────── Sync Updates ←───────┘
Existing synchronizers:
  • nodes.updates - Node attribute changes
  • document.updates - Rich text document changes
  • collaborations - Node collaborator updates
  • node-reactions - Reactions on nodes
  • node-interactions - User interactions
  • node-tombstones - Soft deletes
  • users - Workspace user updates

Step-by-step guide

1

Define the synchronizer schema

Create a schema file in packages/core/src/synchronizers/ with input and output types.
packages/core/src/synchronizers/task-assignments.ts
export type SyncTaskAssignmentsInput = {
  type: 'task.assignments';
  rootId: string;
};

export type SyncTaskAssignmentData = {
  id: string;
  taskId: string;
  assigneeId: string;
  workspaceId: string;
  assignedBy: string;
  assignedAt: string;
  completedAt: string | null;
  revision: string;
};

declare module '@brainbox/core' {
  interface SynchronizerMap {
    'task.assignments': {
      input: SyncTaskAssignmentsInput;
      data: SyncTaskAssignmentData;
    };
  }
}
Export from packages/core/src/synchronizers/index.ts:
export * from './task-assignments';
The declare module augmentation is critical - it enables type-safe sync operations across the codebase.
2

Create the server synchronizer

Implement a synchronizer in apps/server/src/synchronizers/ that extends BaseSynchronizer.
apps/server/src/synchronizers/task-assignments.ts
import {
  SynchronizerOutputMessage,
  SyncTaskAssignmentsInput,
  SyncTaskAssignmentData,
} from '@brainbox/core';
import { database } from '@brainbox/server/data/database';
import { SelectTaskAssignment } from '@brainbox/server/data/schema';
import { createLogger } from '@brainbox/server/lib/logger';
import { BaseSynchronizer } from '@brainbox/server/synchronizers/base';
import { Event } from '@brainbox/server/types/events';

const logger = createLogger('task-assignment-synchronizer');

export class TaskAssignmentSynchronizer extends BaseSynchronizer<SyncTaskAssignmentsInput> {
  public async fetchData(): Promise<SynchronizerOutputMessage<SyncTaskAssignmentsInput> | null> {
    const assignments = await this.fetchAssignments();
    if (assignments.length === 0) {
      return null;
    }

    return this.buildMessage(assignments);
  }

  public async fetchDataFromEvent(
    event: Event
  ): Promise<SynchronizerOutputMessage<SyncTaskAssignmentsInput> | null> {
    if (!this.shouldFetch(event)) {
      return null;
    }

    const assignments = await this.fetchAssignments();
    if (assignments.length === 0) {
      return null;
    }

    return this.buildMessage(assignments);
  }

  private async fetchAssignments() {
    if (this.status === 'fetching') {
      return [];
    }

    this.status = 'fetching';
    try {
      const assignments = await database
        .selectFrom('task_assignments')
        .selectAll()
        .where('root_id', '=', this.input.rootId)
        .where('revision', '>', this.cursor)
        .orderBy('revision', 'asc')
        .limit(20)
        .execute();

      return assignments;
    } catch (error) {
      logger.error(error, 'Error fetching task assignments for sync');
    } finally {
      this.status = 'pending';
    }

    return [];
  }

  private buildMessage(
    unsyncedAssignments: SelectTaskAssignment[]
  ): SynchronizerOutputMessage<SyncTaskAssignmentsInput> {
    const items: SyncTaskAssignmentData[] = unsyncedAssignments.map(
      (assignment) => ({
        id: assignment.id,
        taskId: assignment.task_id,
        assigneeId: assignment.assignee_id,
        workspaceId: assignment.workspace_id,
        assignedBy: assignment.assigned_by,
        assignedAt: assignment.assigned_at.toISOString(),
        completedAt: assignment.completed_at?.toISOString() ?? null,
        revision: assignment.revision.toString(),
      })
    );

    return {
      type: 'synchronizer.output',
      userId: this.user.userId,
      id: this.id,
      items: items.map((item) => ({
        cursor: item.revision,
        data: item,
      })),
    };
  }

  private shouldFetch(event: Event) {
    if (
      event.type === 'task.assigned' &&
      event.rootId === this.input.rootId
    ) {
      return true;
    }

    if (
      event.type === 'task.completed' &&
      event.rootId === this.input.rootId
    ) {
      return true;
    }

    return false;
  }
}
3

Register the synchronizer

Add your synchronizer to the sync engine in apps/server/src/services/sync-engine.ts:
import { TaskAssignmentSynchronizer } from '@brainbox/server/synchronizers/task-assignments';

private createSynchronizer(
  id: string,
  user: ConnectedUser,
  input: SynchronizerInput
): BaseSynchronizer<SynchronizerInput> | null {
  switch (input.type) {
    case 'task.assignments':
      return new TaskAssignmentSynchronizer(id, user, input, '0');
    // ... other cases
  }
}
4

Create client handler (if needed)

If your sync data requires special client-side processing, create a handler in packages/client/src/handlers/.For most cases, the default sync engine handles insertion automatically. Custom handlers are only needed for complex transformations or side effects.
packages/client/src/handlers/synchronizers/task-assignments.ts
import { SyncTaskAssignmentData } from '@brainbox/core';
import { WorkspaceService } from '@brainbox/client/services/workspaces/workspace-service';

export async function handleTaskAssignmentSync(
  workspace: WorkspaceService,
  data: SyncTaskAssignmentData
) {
  await workspace.database
    .insertInto('task_assignments')
    .values({
      id: data.id,
      task_id: data.taskId,
      assignee_id: data.assigneeId,
      workspace_id: data.workspaceId,
      assigned_by: data.assignedBy,
      assigned_at: data.assignedAt,
      completed_at: data.completedAt,
      revision: BigInt(data.revision),
    })
    .onConflict((oc) =>
      oc.column('id').doUpdateSet({
        assignee_id: data.assigneeId,
        completed_at: data.completedAt,
        revision: BigInt(data.revision),
      })
    )
    .execute();
}

Synchronizer patterns

Basic data synchronization

Most synchronizers follow this pattern:
  1. Fetch unsynced data where revision > cursor
  2. Transform database records to sync data format
  3. Return items with cursors for pagination
private async fetchData() {
  const items = await database
    .selectFrom('table_name')
    .selectAll()
    .where('workspace_id', '=', this.user.workspaceId)
    .where('revision', '>', this.cursor)
    .orderBy('revision', 'asc')
    .limit(20)  // Pagination limit
    .execute();

  return items;
}

Event-based triggering

The shouldFetch method determines which events trigger sync updates:
private shouldFetch(event: Event) {
  if (
    event.type === 'collaboration.created' &&
    event.workspaceId === this.user.workspaceId
  ) {
    return true;
  }

  if (
    event.type === 'collaboration.updated' &&
    event.workspaceId === this.user.workspaceId
  ) {
    return true;
  }

  return false;
}

Pagination and cursors

The sync system uses revision-based cursors for efficient pagination:
// Server tracks last synced revision
const assignments = await database
  .selectFrom('task_assignments')
  .selectAll()
  .where('revision', '>', this.cursor)  // Only new items
  .orderBy('revision', 'asc')
  .limit(20)
  .execute();

// Return items with cursors
return {
  type: 'synchronizer.output',
  userId: this.user.userId,
  id: this.id,
  items: items.map((item) => ({
    cursor: item.revision.toString(),  // Next cursor position
    data: item,
  })),
};

Database schema requirements

Tables used in synchronizers must have:
  1. revision column (BIGINT or BIGSERIAL) - Auto-incrementing sync cursor
  2. Appropriate indexes on revision and filter columns
  3. Timestamp columns (created_at, updated_at, deleted_at)
CREATE TABLE task_assignments (
  id TEXT PRIMARY KEY,
  task_id TEXT NOT NULL,
  assignee_id TEXT NOT NULL,
  workspace_id TEXT NOT NULL,
  assigned_by TEXT NOT NULL,
  assigned_at TIMESTAMP NOT NULL,
  completed_at TIMESTAMP,
  revision BIGSERIAL NOT NULL,
  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_task_assignments_revision 
  ON task_assignments(workspace_id, revision);

Event system integration

Emit events to trigger sync updates:
apps/server/src/services/tasks.ts
import { eventBus } from '@brainbox/server/lib/event-bus';

await database
  .insertInto('task_assignments')
  .values({ ... })
  .execute();

eventBus.publish({
  type: 'task.assigned',
  taskId: task.id,
  rootId: task.rootId,
  assigneeId: assignment.assigneeId,
});

Testing sync handlers

import { TaskAssignmentSynchronizer } from './task-assignments';

test('fetches unsynced assignments', async () => {
  const synchronizer = new TaskAssignmentSynchronizer(
    'sync-1',
    mockUser,
    { type: 'task.assignments', rootId: 'root-1' },
    '0'
  );

  const result = await synchronizer.fetchData();
  
  expect(result?.items).toHaveLength(2);
  expect(result?.items[0].data.taskId).toBe('task-1');
});

Performance considerations

  1. Limit batch size: Keep limit(20) or similar to avoid overwhelming clients
  2. Index properly: Add indexes on revision and filter columns
  3. Avoid N+1 queries: Fetch related data in single queries when possible
  4. Use connection pooling: PostgreSQL connection pool handles concurrent sync requests

File locations

  • Schemas: packages/core/src/synchronizers/[name].ts
  • Server synchronizers: apps/server/src/synchronizers/[name].ts
  • Client handlers: packages/client/src/handlers/synchronizers/[name].ts
  • Sync engine: apps/server/src/services/sync-engine.ts
  • Event bus: apps/server/src/lib/event-bus.ts

Build docs developers (and LLMs) love