Worker Threads
The WorkerPool class provides a managed pool of Node.js worker threads for CPU-intensive operations. It integrates with go-go-scope’s structured concurrency model, ensuring workers are properly cleaned up when scopes are disposed.
Overview
Worker threads allow you to:
Offload CPU-intensive work - Keep the main thread responsive
Parallel processing - Utilize multiple CPU cores efficiently
Automatic lifecycle - Workers are created and terminated automatically
Structured concurrency - Implements AsyncDisposable for automatic cleanup
Task distribution - Automatic load balancing across workers
Error handling - Graceful error propagation from workers
Worker threads are ideal for CPU-bound tasks like image processing, data transformation, cryptography, or complex calculations. For I/O-bound tasks, use regular async operations instead.
Quick Start
import { WorkerPool } from 'go-go-scope' ;
// Create a worker pool
await using pool = new WorkerPool ({
size: 4 // 4 workers
});
// Execute CPU-intensive work
const result = await pool . execute (
( n : number ) => {
// This runs in a worker thread
function fibonacci ( n : number ) : number {
if ( n <= 1 ) return n ;
return fibonacci ( n - 1 ) + fibonacci ( n - 2 );
}
return fibonacci ( n );
},
40 // Input data
);
console . log ( `Fibonacci(40) = ${ result } ` );
// Pool automatically cleaned up when scope exits
Worker Pool Options
interface WorkerPoolOptions {
/** Number of worker threads (default: CPU count - 1) */
size ?: number ;
/** Timeout in ms before idle workers terminate (default: 60000) */
idleTimeout ?: number ;
/** Enable SharedArrayBuffer for zero-copy transfers (default: false) */
sharedMemory ?: boolean ;
/** Memory limits per worker */
resourceLimits ?: {
/** Max old generation heap size in MB (default: 512) */
maxOldGenerationSizeMb ?: number ;
/** Max young generation heap size in MB (default: 128) */
maxYoungGenerationSizeMb ?: number ;
};
}
Creating Worker Pools
Default Configuration
Custom Size
With Resource Limits
With Idle Timeout
Convenience Function
import { WorkerPool } from 'go-go-scope' ;
// Uses CPU count - 1 workers
await using pool = new WorkerPool ();
Executing Tasks
Single Task Execution
Execute a single CPU-intensive function:
const result = await pool . execute (
( data : { width : number ; height : number }) => {
// This function runs in a worker thread
// It's serialized to string, so no closures!
const area = data . width * data . height ;
const perimeter = 2 * ( data . width + data . height );
return { area , perimeter };
},
{ width: 100 , height: 50 }
);
console . log ( result ); // { area: 5000, perimeter: 300 }
The function passed to execute() is serialized and sent to the worker. You cannot use:
Variables from outer scope (closures)
Imported modules (must use require() inside)
Functions not defined in the worker function
Batch Execution
Process multiple items in parallel:
Ordered Results
Unordered Results
Image Processing
const items = [ 10 , 20 , 30 , 40 , 50 ];
const results = await pool . executeBatch (
items ,
( n : number ) => {
// Square each number
return n * n ;
},
{ ordered: true } // Results in same order as inputs
);
// Check results
for ( const [ error , value ] of results ) {
if ( error ) {
console . error ( 'Task failed:' , error );
} else {
console . log ( 'Result:' , value );
}
}
Transferable Objects
For large data (like ArrayBuffers), use transfer lists to avoid copying:
import { WorkerPool } from 'go-go-scope' ;
await using pool = new WorkerPool ();
// Create large buffer
const buffer = new ArrayBuffer ( 100_000_000 ); // 100MB
const view = new Uint8Array ( buffer );
for ( let i = 0 ; i < view . length ; i ++ ) {
view [ i ] = i % 256 ;
}
// Transfer buffer to worker (zero-copy)
const result = await pool . execute (
( buf : ArrayBuffer ) => {
const view = new Uint8Array ( buf );
let sum = 0 ;
for ( let i = 0 ; i < view . length ; i ++ ) {
sum += view [ i ];
}
return sum ;
},
buffer ,
[ buffer ] // Transfer list - buffer is moved, not copied
);
console . log ( `Sum: ${ result } ` );
// buffer is now detached in main thread
console . log ( buffer . byteLength ); // 0
After transferring, the original object is “neutered” (detached) in the sender thread. Use transfer lists for:
ArrayBuffer
MessagePort
ReadableStream
WritableStream
TransformStream
Worker Pool Statistics
Monitor pool health and utilization:
const stats = pool . stats ();
console . log ( `Total workers: ${ stats . total } ` );
console . log ( `Busy workers: ${ stats . busy } ` );
console . log ( `Idle workers: ${ stats . idle } ` );
console . log ( `Pending tasks: ${ stats . pending } ` );
// Calculate utilization
const utilization = ( stats . busy / stats . total * 100 ). toFixed ( 2 );
console . log ( `Utilization: ${ utilization } %` );
Use Cases
Image Processing
import { WorkerPool } from 'go-go-scope' ;
await using pool = new WorkerPool ({ size: 4 });
const images = [ 'photo1.jpg' , 'photo2.jpg' , 'photo3.jpg' ];
const thumbnails = await pool . executeBatch (
images ,
( path : string ) => {
const sharp = require ( 'sharp' );
const fs = require ( 'fs' );
const buffer = fs . readFileSync ( path );
return sharp ( buffer )
. resize ( 200 , 200 , { fit: 'cover' })
. toBuffer ();
}
);
// Save thumbnails
for ( let i = 0 ; i < thumbnails . length ; i ++ ) {
const [ error , buffer ] = thumbnails [ i ] ! ;
if ( ! error && buffer ) {
fs . writeFileSync ( `thumb_ ${ i } .jpg` , buffer );
}
}
Data Transformation
const csvRows = await fetchLargeCSV ();
await using pool = new WorkerPool ();
const transformed = await pool . executeBatch (
csvRows ,
( row : string ) => {
// Parse and transform each row
const fields = row . split ( ',' );
return {
id: fields [ 0 ],
name: fields [ 1 ],
value: parseFloat ( fields [ 2 ] || '0' ),
timestamp: new Date ( fields [ 3 ] || Date . now ())
};
},
{ ordered: true }
);
Cryptographic Operations
await using pool = new WorkerPool ({ size: 8 });
const passwords = [ 'user1pass' , 'user2pass' , 'user3pass' ];
const hashed = await pool . executeBatch (
passwords ,
( password : string ) => {
const crypto = require ( 'crypto' );
const salt = crypto . randomBytes ( 16 ). toString ( 'hex' );
const hash = crypto . pbkdf2Sync (
password ,
salt ,
100000 ,
64 ,
'sha512'
). toString ( 'hex' );
return { salt , hash };
}
);
Mathematical Computations
await using pool = new WorkerPool ();
// Calculate prime numbers
const ranges = [
{ start: 0 , end: 10000 },
{ start: 10000 , end: 20000 },
{ start: 20000 , end: 30000 },
{ start: 30000 , end: 40000 }
];
const primeSets = await pool . executeBatch (
ranges ,
( range : { start : number ; end : number }) => {
const primes : number [] = [];
for ( let n = range . start ; n < range . end ; n ++ ) {
if ( isPrime ( n )) primes . push ( n );
}
return primes ;
function isPrime ( n : number ) : boolean {
if ( n < 2 ) return false ;
for ( let i = 2 ; i <= Math . sqrt ( n ); i ++ ) {
if ( n % i === 0 ) return false ;
}
return true ;
}
}
);
// Flatten results
const allPrimes = primeSets
. flatMap (([ _ , primes ]) => primes || []);
Video Processing
await using pool = new WorkerPool ({
size: 4 ,
resourceLimits: {
maxOldGenerationSizeMb: 2048 // 2GB per worker
}
});
const videoFrames = await extractFrames ( 'video.mp4' );
const processed = await pool . executeBatch (
videoFrames ,
( frame : ArrayBuffer ) => {
const sharp = require ( 'sharp' );
// Apply filters
return sharp ( Buffer . from ( frame ))
. modulate ({ brightness: 1.2 , saturation: 1.1 })
. sharpen ()
. toBuffer ();
}
);
Error Handling
Workers propagate errors back to the main thread:
await using pool = new WorkerPool ();
try {
const result = await pool . execute (
( n : number ) => {
if ( n < 0 ) {
throw new Error ( 'Negative numbers not allowed' );
}
return Math . sqrt ( n );
},
- 1
);
} catch ( error ) {
console . error ( 'Worker error:' , error . message );
}
// Batch execution returns Result tuples
const results = await pool . executeBatch (
[ 1 , - 2 , 3 ],
( n : number ) => {
if ( n < 0 ) throw new Error ( 'Negative number' );
return n * n ;
}
);
for ( const [ error , value ] of results ) {
if ( error ) {
console . error ( 'Task failed:' , error . message );
} else {
console . log ( 'Result:' , value );
}
}
Idle Worker Cleanup
Workers are automatically terminated after being idle:
await using pool = new WorkerPool ({
size: 8 ,
idleTimeout: 30000 // 30 seconds
});
// Execute some work
await pool . execute (( n : number ) => n * 2 , 42 );
pool . stats (); // { total: 1, busy: 0, idle: 1, pending: 0 }
// After 30s of idle time
await new Promise ( r => setTimeout ( r , 31000 ));
pool . stats (); // { total: 0, busy: 0, idle: 0, pending: 0 }
// Workers automatically terminated
// New tasks will create workers again
await pool . execute (( n : number ) => n * 2 , 84 );
pool . stats (); // { total: 1, busy: 0, idle: 1, pending: 0 }
The pool always keeps at least 1 worker alive (if workers exist) to avoid cold start overhead.
Integration with Scope
Worker pools integrate seamlessly with scopes:
Automatic Cleanup
With Task
Parallel Processing
import { scope } from 'go-go-scope' ;
await using s = scope ();
const pool = new WorkerPool ({ size: 4 });
s . onDispose ( async () => {
await pool [ Symbol . asyncDispose ]();
});
// Pool cleaned up when scope exits
Choose Appropriate Pool Size
// CPU-bound: # of CPU cores - 1
const pool = new WorkerPool ({
size: require ( 'os' ). cpus (). length - 1
});
// Memory-intensive: Fewer workers
const memoryPool = new WorkerPool ({
size: 4 ,
resourceLimits: {
maxOldGenerationSizeMb: 2048
}
});
Minimize Data Transfer
// Bad: Sending large object
await pool . execute (
( data : LargeObject ) => process ( data ),
hugeObject // Serialized = slow
);
// Good: Send only what's needed
await pool . execute (
( id : string ) => {
const data = loadFromDisk ( id );
return process ( data );
},
objectId // Just an ID = fast
);
Use Transfer Lists for Large Buffers
const buffer = new ArrayBuffer ( 10_000_000 );
// Transfer instead of copy
await pool . execute (
( buf : ArrayBuffer ) => processBuffer ( buf ),
buffer ,
[ buffer ] // Zero-copy transfer
);
Batch Similar Tasks
// Better than individual executes
const results = await pool . executeBatch (
items ,
processingFunction ,
{ ordered: false } // Fastest
);
Monitor Pool Utilization
setInterval (() => {
const stats = pool . stats ();
const utilization = stats . busy / stats . total ;
if ( utilization > 0.9 ) {
console . warn ( 'Pool highly utilized, consider increasing size' );
} else if ( utilization < 0.1 ) {
console . info ( 'Pool underutilized, consider decreasing size' );
}
}, 60000 );
Limitations
Function Serialization Constraints:
No closures - Cannot access outer scope variables
Must use require() for dependencies (not import)
Function is converted to string and back
No access to main thread context
const multiplier = 10 ;
// ❌ WRONG: Uses closure
await pool . execute (
( n : number ) => n * multiplier , // multiplier undefined in worker!
5
);
// ✅ CORRECT: Pass data as argument
await pool . execute (
( data : { n : number ; multiplier : number }) => data . n * data . multiplier ,
{ n: 5 , multiplier: 10 }
);
Disposal and Cleanup
Worker pools implement AsyncDisposable:
await using pool = new WorkerPool ();
// Execute work
await pool . execute (( n ) => n * 2 , 21 );
// Automatic cleanup when leaving scope
// - All pending tasks rejected
// - All workers terminated
// - Resources freed
// Manual disposal
if ( pool . isDisposed ) {
console . log ( 'Pool already disposed' );
} else {
await pool [ Symbol . asyncDispose ]();
}
// After disposal, execute throws
try {
await pool . execute (( n ) => n , 1 );
} catch ( error ) {
console . error ( error . message ); // "WorkerPool has been disposed"
}
Complete Example
import { scope , WorkerPool , parallel } from 'go-go-scope' ;
import { readdir , readFile , writeFile } from 'fs/promises' ;
import { join } from 'path' ;
// Process images in parallel using worker pool
await using s = scope ();
await using pool = new WorkerPool ({
size: 8 ,
resourceLimits: {
maxOldGenerationSizeMb: 1024
}
});
const inputDir = './images' ;
const outputDir = './thumbnails' ;
// Get all image files
const files = await readdir ( inputDir );
const imageFiles = files . filter ( f => / \. ( jpg | png | webp ) $ / i . test ( f ));
console . log ( `Processing ${ imageFiles . length } images...` );
const startTime = Date . now ();
// Process in batches
const results = await pool . executeBatch (
imageFiles ,
( filename : string ) => {
const sharp = require ( 'sharp' );
const fs = require ( 'fs' );
const path = require ( 'path' );
const inputPath = path . join ( './images' , filename );
const buffer = fs . readFileSync ( inputPath );
return sharp ( buffer )
. resize ( 200 , 200 , { fit: 'cover' })
. jpeg ({ quality: 80 })
. toBuffer ();
},
{ ordered: false } // Process as fast as possible
);
// Save results
let succeeded = 0 ;
let failed = 0 ;
for ( let i = 0 ; i < results . length ; i ++ ) {
const [ error , buffer ] = results [ i ] ! ;
const filename = imageFiles [ i ] ! ;
if ( error ) {
console . error ( `Failed to process ${ filename } :` , error . message );
failed ++ ;
} else if ( buffer ) {
const outputPath = join ( outputDir , filename );
await writeFile ( outputPath , buffer );
succeeded ++ ;
}
}
const duration = Date . now () - startTime ;
console . log ( ` \n Processing complete:` );
console . log ( ` Succeeded: ${ succeeded } ` );
console . log ( ` Failed: ${ failed } ` );
console . log ( ` Duration: ${ duration } ms` );
console . log ( ` Avg per image: ${ ( duration / imageFiles . length ). toFixed ( 2 ) } ms` );
// Pool stats
const stats = pool . stats ();
console . log ( ` \n Pool stats:` );
console . log ( ` Total workers: ${ stats . total } ` );
console . log ( ` Active: ${ stats . busy } ` );
console . log ( ` Idle: ${ stats . idle } ` );