Overview
The Scope class is the foundational building block of go-go-scope. It provides structured concurrency by managing tasks, resources, and cancellation signals in a hierarchical manner. All tasks spawned within a scope are automatically cancelled when the scope is disposed.
Factory Function
scope()
The recommended way to create a scope is using the scope() factory function:
import { scope } from 'go-go-scope'
await using s = scope ({
name: 'my-scope' ,
timeout: 5000 ,
concurrency: 10
})
Configuration options for the scope Optional timeout in milliseconds. The scope will be aborted after this duration.
Optional parent AbortSignal to link cancellation. When the parent aborts, this scope aborts.
Optional name for the scope. Defaults to "scope-{id}".
Optional concurrency limit for tasks. If set, only this many tasks will execute concurrently.
Optional circuit breaker configuration. When set, all tasks execute through a circuit breaker.
Optional parent scope. Child inherits parent’s AbortSignal and services.
Optional lifecycle hooks for scope events (beforeTask, afterTask, onCancel, etc.).
Optional custom logger for structured logging.
logLevel
'debug' | 'info' | 'warn' | 'error'
Minimum log level for console logging (if no custom logger provided).
Optional persistence providers for distributed features (locks, circuit breaker state, idempotency, checkpoints).
Idempotency configuration for the scope.
taskPooling
{ maxSize?: number; enabled?: boolean }
Task pooling configuration for reducing GC pressure.
workerPool
{ size?: number; idleTimeout?: number }
Worker pool configuration for CPU-intensive tasks. Only used when tasks are spawned with worker: true.
Optional context object accessible in all tasks via the context parameter.
Enable log correlation with traceId and spanId. Default: false
External trace ID for log correlation (for continuing traces from external sources).
A new Scope instance configured with the given options
Core Methods
task()
Spawn a task within the scope. The task receives a context object with services, signal, logger, and optional checkpoint/progress utilities.
const t = s . task ( async ({ services , signal , logger , context }) => {
logger . info ( 'Task started' )
// Check if cancelled
if ( signal . aborted ) {
throw new Error ( 'Cancelled' )
}
// Use provided services
const db = services . database
const result = await db . query ( 'SELECT * FROM users' )
return result
}, {
timeout: 5000 ,
retry: 'exponential' ,
errorClass: DatabaseError
})
const [ err , result ] = await t
fn
(ctx: TaskContext<Services>) => Promise<T>
required
The task function to execute Services provided to the scope via provide()
AbortSignal for cancellation detection
Structured logger with task context
Context object from scope options
checkpoint
{ save: (data: unknown) => Promise<void>; data?: unknown }
Checkpoint utilities (available when checkpoint provider is configured)
Progress tracking utilities (available when checkpoint provider is configured)
Optional task configuration Unique task identifier for checkpointing, idempotency, and observability
Task timeout in milliseconds
retry
'exponential' | 'linear' | 'fixed' | RetryConfig
Retry configuration or preset strategy
Error class to wrap all errors for typed error handling
Error class to wrap only system errors (preserves tagged errors)
Deduplication key - tasks with same key share results while in-flight
memo
{ key: string | symbol; ttl: number }
Memoization config - cache successful results for TTL milliseconds
idempotency
{ key: string | (() => string); ttl?: number }
Idempotency config - persist results across executions (requires persistence provider)
Execute task in worker thread for CPU-intensive operations
Checkpoint configuration for long-running tasks
otel
{ name?: string; attributes?: Record<string, unknown> }
OpenTelemetry tracing configuration
A Task instance that resolves to a Result tuple [error, value]
parallel()
Run multiple tasks in parallel with optional concurrency limit and progress tracking.
const results = await s . parallel ([
async () => fetchUser ( 1 ),
async () => fetchUser ( 2 ),
async () => fetchUser ( 3 )
], {
concurrency: 2 ,
onProgress : ( completed , total , result ) => {
console . log ( ` ${ completed } / ${ total } completed` )
},
continueOnError: true
})
// results is [Result<E, User>, Result<E, User>, Result<E, User>]
factories
(() => Promise<T>)[]
required
Array of factory functions that create promises
Maximum number of concurrent tasks (0 = unlimited)
onProgress
(completed: number, total: number, result: Result<E, T>) => void
Progress callback invoked after each task completes
If true, continue executing remaining tasks even if one fails. Default: false
Array of Result tuples, one per factory function
race()
Race multiple tasks against each other - first to settle wins.
const [ err , result ] = await s . race ([
async () => fetchFromPrimary (),
async () => fetchFromBackup (),
async () => fetchFromFallback ()
], {
requireSuccess: true , // Only successful results count as winners
timeout: 5000 , // Fail if no winner within 5s
concurrency: 2 // Only start 2 tasks at a time
})
factories
(() => Promise<T>)[]
required
Array of factory functions to race
If true, only successful results count as winners. Default: false
Optional timeout in milliseconds
Optional concurrency limit
Result of the winning task
channel()
Create a Go-style buffered channel for concurrent communication.
const ch = s . channel < string >( 10 ) // Buffer capacity of 10
// Producer
s . task ( async () => {
for ( const item of items ) {
await ch . send ( item )
}
ch . close ()
})
// Consumer
for await ( const item of ch ) {
await process ( item )
}
Buffer capacity. Default: 0 (unbuffered)
backpressure
'block' | 'drop-oldest' | 'drop-latest' | 'error' | 'sample'
Backpressure strategy. Default: ‘block’
Callback when value is dropped due to backpressure
Time window for ‘sample’ strategy. Default: 1000ms
broadcast()
Create a broadcast channel for pub/sub patterns. Unlike regular channels, all subscribers receive every message.
const bc = s . broadcast < string >()
// Subscribe multiple consumers
s . task ( async () => {
for await ( const msg of bc . subscribe ()) {
console . log ( 'Consumer 1:' , msg )
}
})
s . task ( async () => {
for await ( const msg of bc . subscribe ()) {
console . log ( 'Consumer 2:' , msg )
}
})
// Publish messages
await bc . send ( 'hello' )
await bc . send ( 'world' )
bc . close ()
A new BroadcastChannel instance
semaphore()
Create a semaphore for rate limiting concurrent access.
const sem = s . semaphore ( 3 ) // Max 3 concurrent operations
await sem . acquire ( async () => {
// Critical section - at most 3 concurrent
await heavyOperation ()
})
Number of available permits
pool()
Create a resource pool for managing connections, workers, or other resources.
const pool = s . pool ({
create : () => createDatabaseConnection (),
destroy : ( conn ) => conn . close (),
min: 2 ,
max: 10 ,
acquireTimeout: 5000 ,
healthCheck : async ( conn ) => {
try {
await conn . query ( 'SELECT 1' )
return { healthy: true }
} catch {
return { healthy: false }
}
},
healthCheckInterval: 30000
})
// Acquire and use
const conn = await pool . acquire ()
try {
await conn . query ( 'SELECT * FROM users' )
} finally {
await pool . release ( conn )
}
// Or use execute() for automatic release
await pool . execute ( async ( conn ) => {
return conn . query ( 'SELECT * FROM users' )
})
options
ResourcePoolOptions<T>
required
Factory function to create resources
destroy
(resource: T) => Promise<void> | void
required
Cleanup function to destroy resources
Minimum pool size. Default: 0
Maximum wait time for resource acquisition. Default: 30000ms
healthCheck
(resource: T) => Promise<{ healthy: boolean; message?: string }>
Optional health check function
Interval between health checks. Default: 30000ms
A new ResourcePool instance
Dependency Injection
provide()
Provide a service to the scope, making it available to all tasks.
const s = scope ()
. provide ( 'database' , new Database ())
. provide ( 'logger' , new Logger ())
const t = s . task ( async ({ services }) => {
const db = services . database
const logger = services . logger
// Use services...
})
Service value or factory function
dispose
(value: T) => void | Promise<void>
Optional cleanup function called when scope is disposed
scope
Scope<Services & Record<K, T>>
The scope with the new service added
use()
Retrieve a service from the scope.
const db = s . use ( 'database' )
has()
Check if a service exists in the scope.
if ( s . has ( 'database' )) {
const db = s . use ( 'database' )
}
True if the service exists
override()
Override an existing service with a new value.
const s = scope ()
. provide ( 'database' , new Database ())
. override ( 'database' , new MockDatabase ())
value
Services[K] | (() => Services[K])
required
New service value or factory
dispose
(value: Services[K]) => void | Promise<void>
Optional cleanup function
The scope with the service overridden
Rate Limiting
debounce()
Create a debounced function that delays execution.
const search = s . debounce ( async ( query : string ) => {
return await api . search ( query )
}, { wait: 300 })
// Multiple calls within 300ms - only last one executes
await search ( 'a' )
await search ( 'ab' )
await search ( 'abc' ) // Only this executes
fn
(...args: Args) => Promise<T>
required
Function to debounce
Wait time in milliseconds. Default: 300
Trigger on leading edge. Default: false
Trigger on trailing edge. Default: true
debounced
(...args: Args) => Promise<Result<E, T>>
Debounced function that returns a Result tuple
throttle()
Create a throttled function that executes at most once per interval.
const save = s . throttle ( async ( data : string ) => {
await api . save ( data )
}, { interval: 1000 })
// Multiple calls - executes at most once per second
await save ( 'data1' )
await save ( 'data2' ) // Throttled
await save ( 'data3' ) // Throttled
fn
(...args: Args) => Promise<T>
required
Function to throttle
Interval in milliseconds. Default: 300
Trigger on leading edge. Default: true
Trigger on trailing edge. Default: false
throttled
(...args: Args) => Promise<Result<E, T>>
Throttled function that returns a Result tuple
poll()
Create a polling operation that executes a function at regular intervals.
const controller = s . poll (
async ( signal ) => {
const status = await api . getStatus ()
return status
},
( status ) => {
console . log ( 'Status:' , status )
},
{ interval: 5000 , immediate: true }
)
// Control polling
controller . stop ()
controller . start ()
const { running , pollCount } = controller . status ()
fn
(signal: AbortSignal) => Promise<T>
required
Function to poll
onValue
(value: T) => void | Promise<void>
required
Callback for each polled value
Poll interval in milliseconds. Default: 5000
Run immediately on start. Default: true
Poll controller with start(), stop(), and status() methods
Utilities
select()
Go-style select statement for channel operations.
const ch1 = s . channel < string >()
const ch2 = s . channel < number >()
const [ err , result ] = await s . select (
new Map ([
[ ch1 , async ( value ) => `Got string: ${ value } ` ],
[ ch2 , async ( value ) => `Got number: ${ value } ` ]
]),
{ timeout: 5000 }
)
cases
Map<Channel<T>, (value: T) => Promise<R>>
required
Map of channels to handler functions
Result of the first channel to receive a value
createChild()
Create a child scope that inherits from this scope.
const parent = scope ({ name: 'parent' })
. provide ( 'config' , config )
const child = parent . createChild ({
name: 'child' ,
timeout: 5000
})
// Child has access to parent's services
const t = child . task ( async ({ services }) => {
const config = services . config // Inherited from parent
})
Child scope options (same as scope() options)
child
Scope<ParentServices & ChildServices>
New child scope
onDispose()
Register a cleanup callback to run when the scope is disposed.
s . onDispose (() => {
console . log ( 'Scope disposed' )
cleanup ()
})
fn
() => void | Promise<void>
required
Cleanup function
onBeforeTask()
Register a callback to run before each task starts.
s . onBeforeTask (( name , index , options ) => {
console . log ( `Starting task ${ name } (# ${ index } )` )
})
fn
(name: string, index: number, options?: TaskOptions) => void
required
Callback function
onAfterTask()
Register a callback to run after each task completes.
s . onAfterTask (( name , duration , error , index ) => {
if ( error ) {
console . log ( `Task ${ name } failed after ${ duration } ms` )
} else {
console . log ( `Task ${ name } succeeded in ${ duration } ms` )
}
})
fn
(name: string, duration: number, error?: unknown, index?: number) => void
required
Callback function
debugTree()
Generate a visual tree representation of the scope hierarchy.
const parent = scope ({ name: 'root' })
const child = parent . createChild ({ name: 'child' })
const grandchild = child . createChild ({ name: 'grandchild' })
console . log ( parent . debugTree ())
// Output:
// 📦 root (id: 1)
// ├─ 📦 child (id: 2)
// │ └─ 📦 grandchild (id: 3)
console . log ( parent . debugTree ({ format: 'mermaid' }))
// Output: Mermaid diagram syntax
Output format. Default: ‘ascii’
Include statistics (tasks, concurrency). Default: true
Tree representation as a string
Properties
signal
The AbortSignal for this scope. Becomes aborted when the scope is disposed or times out.
const signal = s . signal
if ( signal . aborted ) {
console . log ( 'Scope is aborted' )
}
isDisposed
Whether the scope has been disposed.
if ( s . isDisposed ) {
console . log ( 'Scope is disposed' )
}
scopeName
The name of the scope.
console . log ( s . scopeName ) // "my-scope"
concurrency
The concurrency limit for this scope, or undefined if unlimited.
const limit = s . concurrency // 10 or undefined
traceId
The trace ID for log correlation (if logCorrelation is enabled).
const s = scope ({ logCorrelation: true })
console . log ( s . traceId ) // "abc123..."
Trace ID for distributed tracing
spanId
The span ID for log correlation (if logCorrelation is enabled).
const s = scope ({ logCorrelation: true })
console . log ( s . spanId ) // "def456..."
Span ID for distributed tracing
Examples
Basic Usage
import { scope } from 'go-go-scope'
await using s = scope ()
const t = s . task ( async ({ signal }) => {
// Do work...
return 'result'
})
const [ err , result ] = await t
if ( err ) {
console . error ( 'Task failed:' , err )
} else {
console . log ( 'Task succeeded:' , result )
}
// Scope automatically disposed here
With Services
class Database {
async query ( sql : string ) { /* ... */ }
}
class Logger {
log ( msg : string ) { console . log ( msg ) }
}
await using s = scope ()
. provide ( 'db' , new Database ())
. provide ( 'logger' , new Logger ())
const t = s . task ( async ({ services }) => {
const { db , logger } = services
logger . log ( 'Querying database' )
const result = await db . query ( 'SELECT * FROM users' )
return result
})
With Concurrency Limit
await using s = scope ({ concurrency: 3 })
// Only 3 tasks run concurrently
const results = await s . parallel ([
async () => fetchUser ( 1 ),
async () => fetchUser ( 2 ),
async () => fetchUser ( 3 ),
async () => fetchUser ( 4 ),
async () => fetchUser ( 5 )
])
With Circuit Breaker
await using s = scope ({
circuitBreaker: {
failureThreshold: 5 ,
resetTimeout: 30000 ,
onOpen : ( failures ) => {
console . log ( `Circuit opened after ${ failures } failures` )
}
}
})
// All tasks protected by circuit breaker
for ( let i = 0 ; i < 10 ; i ++ ) {
const [ err , result ] = await s . task (() => unreliableApi ())
if ( err ) {
console . error ( 'API call failed:' , err )
}
}
With Checkpointing
import { FileCheckpointProvider } from './checkpoint-provider'
await using s = scope ({
persistence: {
checkpoint: new FileCheckpointProvider ( './checkpoints' )
}
})
const [ err , result ] = await s . task (
async ({ checkpoint , progress }) => {
const items = await loadItems ()
let processed = checkpoint ?. data ?. processed ?? 0
for ( let i = processed ; i < items . length ; i ++ ) {
await processItem ( items [ i ])
progress . update (( i / items . length ) * 100 )
if ( i % 100 === 0 ) {
await checkpoint . save ({ processed: i })
}
}
return { total: items . length }
},
{
id: 'batch-job' ,
checkpoint: {
interval: 60000 ,
onCheckpoint : ( cp ) => console . log ( `Saved checkpoint ${ cp . sequence } ` )
}
}
)
Parent-Child Scopes
await using parent = scope ({ name: 'parent' })
. provide ( 'config' , { apiUrl: 'https://api.example.com' })
// Create child scope
const child = parent . createChild ({
name: 'child' ,
timeout: 5000
})
// Child has access to parent services
const t = child . task ( async ({ services }) => {
const config = services . config
// Use inherited config...
})
// When parent is disposed, child is also disposed