Skip to main content

Worker Threads

The WorkerPool class provides a managed pool of Node.js worker threads for CPU-intensive operations. It integrates with go-go-scope’s structured concurrency model, ensuring workers are properly cleaned up when scopes are disposed.

Overview

Worker threads allow you to:
  • Offload CPU-intensive work - Keep the main thread responsive
  • Parallel processing - Utilize multiple CPU cores efficiently
  • Automatic lifecycle - Workers are created and terminated automatically
  • Structured concurrency - Implements AsyncDisposable for automatic cleanup
  • Task distribution - Automatic load balancing across workers
  • Error handling - Graceful error propagation from workers
Worker threads are ideal for CPU-bound tasks like image processing, data transformation, cryptography, or complex calculations. For I/O-bound tasks, use regular async operations instead.

Quick Start

import { WorkerPool } from 'go-go-scope';

// Create a worker pool
await using pool = new WorkerPool({
  size: 4 // 4 workers
});

// Execute CPU-intensive work
const result = await pool.execute(
  (n: number) => {
    // This runs in a worker thread
    function fibonacci(n: number): number {
      if (n <= 1) return n;
      return fibonacci(n - 1) + fibonacci(n - 2);
    }
    return fibonacci(n);
  },
  40 // Input data
);

console.log(`Fibonacci(40) = ${result}`);

// Pool automatically cleaned up when scope exits

Worker Pool Options

interface WorkerPoolOptions {
  /** Number of worker threads (default: CPU count - 1) */
  size?: number;
  
  /** Timeout in ms before idle workers terminate (default: 60000) */
  idleTimeout?: number;
  
  /** Enable SharedArrayBuffer for zero-copy transfers (default: false) */
  sharedMemory?: boolean;
  
  /** Memory limits per worker */
  resourceLimits?: {
    /** Max old generation heap size in MB (default: 512) */
    maxOldGenerationSizeMb?: number;
    /** Max young generation heap size in MB (default: 128) */
    maxYoungGenerationSizeMb?: number;
  };
}

Creating Worker Pools

import { WorkerPool } from 'go-go-scope';

// Uses CPU count - 1 workers
await using pool = new WorkerPool();

Executing Tasks

Single Task Execution

Execute a single CPU-intensive function:
const result = await pool.execute(
  (data: { width: number; height: number }) => {
    // This function runs in a worker thread
    // It's serialized to string, so no closures!
    const area = data.width * data.height;
    const perimeter = 2 * (data.width + data.height);
    return { area, perimeter };
  },
  { width: 100, height: 50 }
);

console.log(result); // { area: 5000, perimeter: 300 }
The function passed to execute() is serialized and sent to the worker. You cannot use:
  • Variables from outer scope (closures)
  • Imported modules (must use require() inside)
  • Functions not defined in the worker function

Batch Execution

Process multiple items in parallel:
const items = [10, 20, 30, 40, 50];

const results = await pool.executeBatch(
  items,
  (n: number) => {
    // Square each number
    return n * n;
  },
  { ordered: true } // Results in same order as inputs
);

// Check results
for (const [error, value] of results) {
  if (error) {
    console.error('Task failed:', error);
  } else {
    console.log('Result:', value);
  }
}

Transferable Objects

For large data (like ArrayBuffers), use transfer lists to avoid copying:
import { WorkerPool } from 'go-go-scope';

await using pool = new WorkerPool();

// Create large buffer
const buffer = new ArrayBuffer(100_000_000); // 100MB
const view = new Uint8Array(buffer);
for (let i = 0; i < view.length; i++) {
  view[i] = i % 256;
}

// Transfer buffer to worker (zero-copy)
const result = await pool.execute(
  (buf: ArrayBuffer) => {
    const view = new Uint8Array(buf);
    let sum = 0;
    for (let i = 0; i < view.length; i++) {
      sum += view[i];
    }
    return sum;
  },
  buffer,
  [buffer] // Transfer list - buffer is moved, not copied
);

console.log(`Sum: ${result}`);

// buffer is now detached in main thread
console.log(buffer.byteLength); // 0
After transferring, the original object is “neutered” (detached) in the sender thread. Use transfer lists for:
  • ArrayBuffer
  • MessagePort
  • ReadableStream
  • WritableStream
  • TransformStream

Worker Pool Statistics

Monitor pool health and utilization:
const stats = pool.stats();

console.log(`Total workers: ${stats.total}`);
console.log(`Busy workers: ${stats.busy}`);
console.log(`Idle workers: ${stats.idle}`);
console.log(`Pending tasks: ${stats.pending}`);

// Calculate utilization
const utilization = (stats.busy / stats.total * 100).toFixed(2);
console.log(`Utilization: ${utilization}%`);

Use Cases

1

Image Processing

import { WorkerPool } from 'go-go-scope';

await using pool = new WorkerPool({ size: 4 });

const images = ['photo1.jpg', 'photo2.jpg', 'photo3.jpg'];

const thumbnails = await pool.executeBatch(
  images,
  (path: string) => {
    const sharp = require('sharp');
    const fs = require('fs');
    
    const buffer = fs.readFileSync(path);
    return sharp(buffer)
      .resize(200, 200, { fit: 'cover' })
      .toBuffer();
  }
);

// Save thumbnails
for (let i = 0; i < thumbnails.length; i++) {
  const [error, buffer] = thumbnails[i]!;
  if (!error && buffer) {
    fs.writeFileSync(`thumb_${i}.jpg`, buffer);
  }
}
2

Data Transformation

const csvRows = await fetchLargeCSV();

await using pool = new WorkerPool();

const transformed = await pool.executeBatch(
  csvRows,
  (row: string) => {
    // Parse and transform each row
    const fields = row.split(',');
    return {
      id: fields[0],
      name: fields[1],
      value: parseFloat(fields[2] || '0'),
      timestamp: new Date(fields[3] || Date.now())
    };
  },
  { ordered: true }
);
3

Cryptographic Operations

await using pool = new WorkerPool({ size: 8 });

const passwords = ['user1pass', 'user2pass', 'user3pass'];

const hashed = await pool.executeBatch(
  passwords,
  (password: string) => {
    const crypto = require('crypto');
    const salt = crypto.randomBytes(16).toString('hex');
    const hash = crypto.pbkdf2Sync(
      password,
      salt,
      100000,
      64,
      'sha512'
    ).toString('hex');
    return { salt, hash };
  }
);
4

Mathematical Computations

await using pool = new WorkerPool();

// Calculate prime numbers
const ranges = [
  { start: 0, end: 10000 },
  { start: 10000, end: 20000 },
  { start: 20000, end: 30000 },
  { start: 30000, end: 40000 }
];

const primeSets = await pool.executeBatch(
  ranges,
  (range: { start: number; end: number }) => {
    const primes: number[] = [];
    for (let n = range.start; n < range.end; n++) {
      if (isPrime(n)) primes.push(n);
    }
    return primes;
    
    function isPrime(n: number): boolean {
      if (n < 2) return false;
      for (let i = 2; i <= Math.sqrt(n); i++) {
        if (n % i === 0) return false;
      }
      return true;
    }
  }
);

// Flatten results
const allPrimes = primeSets
  .flatMap(([_, primes]) => primes || []);
5

Video Processing

await using pool = new WorkerPool({ 
  size: 4,
  resourceLimits: {
    maxOldGenerationSizeMb: 2048 // 2GB per worker
  }
});

const videoFrames = await extractFrames('video.mp4');

const processed = await pool.executeBatch(
  videoFrames,
  (frame: ArrayBuffer) => {
    const sharp = require('sharp');
    
    // Apply filters
    return sharp(Buffer.from(frame))
      .modulate({ brightness: 1.2, saturation: 1.1 })
      .sharpen()
      .toBuffer();
  }
);

Error Handling

Workers propagate errors back to the main thread:
await using pool = new WorkerPool();

try {
  const result = await pool.execute(
    (n: number) => {
      if (n < 0) {
        throw new Error('Negative numbers not allowed');
      }
      return Math.sqrt(n);
    },
    -1
  );
} catch (error) {
  console.error('Worker error:', error.message);
}

// Batch execution returns Result tuples
const results = await pool.executeBatch(
  [1, -2, 3],
  (n: number) => {
    if (n < 0) throw new Error('Negative number');
    return n * n;
  }
);

for (const [error, value] of results) {
  if (error) {
    console.error('Task failed:', error.message);
  } else {
    console.log('Result:', value);
  }
}

Idle Worker Cleanup

Workers are automatically terminated after being idle:
await using pool = new WorkerPool({
  size: 8,
  idleTimeout: 30000 // 30 seconds
});

// Execute some work
await pool.execute((n: number) => n * 2, 42);

pool.stats(); // { total: 1, busy: 0, idle: 1, pending: 0 }

// After 30s of idle time
await new Promise(r => setTimeout(r, 31000));

pool.stats(); // { total: 0, busy: 0, idle: 0, pending: 0 }
// Workers automatically terminated

// New tasks will create workers again
await pool.execute((n: number) => n * 2, 84);

pool.stats(); // { total: 1, busy: 0, idle: 1, pending: 0 }
The pool always keeps at least 1 worker alive (if workers exist) to avoid cold start overhead.

Integration with Scope

Worker pools integrate seamlessly with scopes:
import { scope } from 'go-go-scope';

await using s = scope();

const pool = new WorkerPool({ size: 4 });
s.onDispose(async () => {
  await pool[Symbol.asyncDispose]();
});

// Pool cleaned up when scope exits

Performance Considerations

1

Choose Appropriate Pool Size

// CPU-bound: # of CPU cores - 1
const pool = new WorkerPool({
  size: require('os').cpus().length - 1
});

// Memory-intensive: Fewer workers
const memoryPool = new WorkerPool({
  size: 4,
  resourceLimits: {
    maxOldGenerationSizeMb: 2048
  }
});
2

Minimize Data Transfer

// Bad: Sending large object
await pool.execute(
  (data: LargeObject) => process(data),
  hugeObject // Serialized = slow
);

// Good: Send only what's needed
await pool.execute(
  (id: string) => {
    const data = loadFromDisk(id);
    return process(data);
  },
  objectId // Just an ID = fast
);
3

Use Transfer Lists for Large Buffers

const buffer = new ArrayBuffer(10_000_000);

// Transfer instead of copy
await pool.execute(
  (buf: ArrayBuffer) => processBuffer(buf),
  buffer,
  [buffer] // Zero-copy transfer
);
4

Batch Similar Tasks

// Better than individual executes
const results = await pool.executeBatch(
  items,
  processingFunction,
  { ordered: false } // Fastest
);
5

Monitor Pool Utilization

setInterval(() => {
  const stats = pool.stats();
  const utilization = stats.busy / stats.total;
  
  if (utilization > 0.9) {
    console.warn('Pool highly utilized, consider increasing size');
  } else if (utilization < 0.1) {
    console.info('Pool underutilized, consider decreasing size');
  }
}, 60000);

Limitations

Function Serialization Constraints:
  1. No closures - Cannot access outer scope variables
  2. Must use require() for dependencies (not import)
  3. Function is converted to string and back
  4. No access to main thread context
const multiplier = 10;

// ❌ WRONG: Uses closure
await pool.execute(
  (n: number) => n * multiplier, // multiplier undefined in worker!
  5
);

// ✅ CORRECT: Pass data as argument
await pool.execute(
  (data: { n: number; multiplier: number }) => data.n * data.multiplier,
  { n: 5, multiplier: 10 }
);

Disposal and Cleanup

Worker pools implement AsyncDisposable:
await using pool = new WorkerPool();

// Execute work
await pool.execute((n) => n * 2, 21);

// Automatic cleanup when leaving scope
// - All pending tasks rejected
// - All workers terminated
// - Resources freed

// Manual disposal
if (pool.isDisposed) {
  console.log('Pool already disposed');
} else {
  await pool[Symbol.asyncDispose]();
}

// After disposal, execute throws
try {
  await pool.execute((n) => n, 1);
} catch (error) {
  console.error(error.message); // "WorkerPool has been disposed"
}

Complete Example

import { scope, WorkerPool, parallel } from 'go-go-scope';
import { readdir, readFile, writeFile } from 'fs/promises';
import { join } from 'path';

// Process images in parallel using worker pool
await using s = scope();
await using pool = new WorkerPool({ 
  size: 8,
  resourceLimits: {
    maxOldGenerationSizeMb: 1024
  }
});

const inputDir = './images';
const outputDir = './thumbnails';

// Get all image files
const files = await readdir(inputDir);
const imageFiles = files.filter(f => /\.(jpg|png|webp)$/i.test(f));

console.log(`Processing ${imageFiles.length} images...`);

const startTime = Date.now();

// Process in batches
const results = await pool.executeBatch(
  imageFiles,
  (filename: string) => {
    const sharp = require('sharp');
    const fs = require('fs');
    const path = require('path');
    
    const inputPath = path.join('./images', filename);
    const buffer = fs.readFileSync(inputPath);
    
    return sharp(buffer)
      .resize(200, 200, { fit: 'cover' })
      .jpeg({ quality: 80 })
      .toBuffer();
  },
  { ordered: false } // Process as fast as possible
);

// Save results
let succeeded = 0;
let failed = 0;

for (let i = 0; i < results.length; i++) {
  const [error, buffer] = results[i]!;
  const filename = imageFiles[i]!;
  
  if (error) {
    console.error(`Failed to process ${filename}:`, error.message);
    failed++;
  } else if (buffer) {
    const outputPath = join(outputDir, filename);
    await writeFile(outputPath, buffer);
    succeeded++;
  }
}

const duration = Date.now() - startTime;

console.log(`\nProcessing complete:`);
console.log(`  Succeeded: ${succeeded}`);
console.log(`  Failed: ${failed}`);
console.log(`  Duration: ${duration}ms`);
console.log(`  Avg per image: ${(duration / imageFiles.length).toFixed(2)}ms`);

// Pool stats
const stats = pool.stats();
console.log(`\nPool stats:`);
console.log(`  Total workers: ${stats.total}`);
console.log(`  Active: ${stats.busy}`);
console.log(`  Idle: ${stats.idle}`);

Build docs developers (and LLMs) love