Skip to main content

Overview

The Scope class is the foundational building block of go-go-scope. It provides structured concurrency by managing tasks, resources, and cancellation signals in a hierarchical manner. All tasks spawned within a scope are automatically cancelled when the scope is disposed.

Factory Function

scope()

The recommended way to create a scope is using the scope() factory function:
import { scope } from 'go-go-scope'

await using s = scope({
  name: 'my-scope',
  timeout: 5000,
  concurrency: 10
})
options
ScopeOptions<Services>
Configuration options for the scope
scope
Scope<Services>
A new Scope instance configured with the given options

Core Methods

task()

Spawn a task within the scope. The task receives a context object with services, signal, logger, and optional checkpoint/progress utilities.
const t = s.task(async ({ services, signal, logger, context }) => {
  logger.info('Task started')
  
  // Check if cancelled
  if (signal.aborted) {
    throw new Error('Cancelled')
  }
  
  // Use provided services
  const db = services.database
  const result = await db.query('SELECT * FROM users')
  
  return result
}, {
  timeout: 5000,
  retry: 'exponential',
  errorClass: DatabaseError
})

const [err, result] = await t
fn
(ctx: TaskContext<Services>) => Promise<T>
required
The task function to execute
options
TaskOptions<E>
Optional task configuration
task
Task<Result<E, T>>
A Task instance that resolves to a Result tuple [error, value]

parallel()

Run multiple tasks in parallel with optional concurrency limit and progress tracking.
const results = await s.parallel([
  async () => fetchUser(1),
  async () => fetchUser(2),
  async () => fetchUser(3)
], {
  concurrency: 2,
  onProgress: (completed, total, result) => {
    console.log(`${completed}/${total} completed`)
  },
  continueOnError: true
})

// results is [Result<E, User>, Result<E, User>, Result<E, User>]
factories
(() => Promise<T>)[]
required
Array of factory functions that create promises
options
ParallelOptions
results
Result<E, T>[]
Array of Result tuples, one per factory function

race()

Race multiple tasks against each other - first to settle wins.
const [err, result] = await s.race([
  async () => fetchFromPrimary(),
  async () => fetchFromBackup(),
  async () => fetchFromFallback()
], {
  requireSuccess: true,  // Only successful results count as winners
  timeout: 5000,         // Fail if no winner within 5s
  concurrency: 2         // Only start 2 tasks at a time
})
factories
(() => Promise<T>)[]
required
Array of factory functions to race
options
RaceOptions
result
Result<E, T>
Result of the winning task

channel()

Create a Go-style buffered channel for concurrent communication.
const ch = s.channel<string>(10)  // Buffer capacity of 10

// Producer
s.task(async () => {
  for (const item of items) {
    await ch.send(item)
  }
  ch.close()
})

// Consumer
for await (const item of ch) {
  await process(item)
}
capacity
number
Buffer capacity. Default: 0 (unbuffered)
options
ChannelOptions<T>
channel
Channel<T>
A new Channel instance

broadcast()

Create a broadcast channel for pub/sub patterns. Unlike regular channels, all subscribers receive every message.
const bc = s.broadcast<string>()

// Subscribe multiple consumers
s.task(async () => {
  for await (const msg of bc.subscribe()) {
    console.log('Consumer 1:', msg)
  }
})

s.task(async () => {
  for await (const msg of bc.subscribe()) {
    console.log('Consumer 2:', msg)
  }
})

// Publish messages
await bc.send('hello')
await bc.send('world')
bc.close()
broadcast
BroadcastChannel<T>
A new BroadcastChannel instance

semaphore()

Create a semaphore for rate limiting concurrent access.
const sem = s.semaphore(3)  // Max 3 concurrent operations

await sem.acquire(async () => {
  // Critical section - at most 3 concurrent
  await heavyOperation()
})
initialPermits
number
required
Number of available permits
semaphore
Semaphore
A new Semaphore instance

pool()

Create a resource pool for managing connections, workers, or other resources.
const pool = s.pool({
  create: () => createDatabaseConnection(),
  destroy: (conn) => conn.close(),
  min: 2,
  max: 10,
  acquireTimeout: 5000,
  healthCheck: async (conn) => {
    try {
      await conn.query('SELECT 1')
      return { healthy: true }
    } catch {
      return { healthy: false }
    }
  },
  healthCheckInterval: 30000
})

// Acquire and use
const conn = await pool.acquire()
try {
  await conn.query('SELECT * FROM users')
} finally {
  await pool.release(conn)
}

// Or use execute() for automatic release
await pool.execute(async (conn) => {
  return conn.query('SELECT * FROM users')
})
options
ResourcePoolOptions<T>
required
pool
ResourcePool<T>
A new ResourcePool instance

Dependency Injection

provide()

Provide a service to the scope, making it available to all tasks.
const s = scope()
  .provide('database', new Database())
  .provide('logger', new Logger())

const t = s.task(async ({ services }) => {
  const db = services.database
  const logger = services.logger
  // Use services...
})
key
string
required
Service key
value
T | (() => T)
required
Service value or factory function
dispose
(value: T) => void | Promise<void>
Optional cleanup function called when scope is disposed
scope
Scope<Services & Record<K, T>>
The scope with the new service added

use()

Retrieve a service from the scope.
const db = s.use('database')
key
keyof Services
required
Service key to retrieve
value
Services[K]
The service value

has()

Check if a service exists in the scope.
if (s.has('database')) {
  const db = s.use('database')
}
key
keyof Services
required
Service key to check
exists
boolean
True if the service exists

override()

Override an existing service with a new value.
const s = scope()
  .provide('database', new Database())
  .override('database', new MockDatabase())
key
keyof Services
required
Service key to override
value
Services[K] | (() => Services[K])
required
New service value or factory
dispose
(value: Services[K]) => void | Promise<void>
Optional cleanup function
scope
Scope<Services>
The scope with the service overridden

Rate Limiting

debounce()

Create a debounced function that delays execution.
const search = s.debounce(async (query: string) => {
  return await api.search(query)
}, { wait: 300 })

// Multiple calls within 300ms - only last one executes
await search('a')
await search('ab')
await search('abc')  // Only this executes
fn
(...args: Args) => Promise<T>
required
Function to debounce
options
DebounceOptions
debounced
(...args: Args) => Promise<Result<E, T>>
Debounced function that returns a Result tuple

throttle()

Create a throttled function that executes at most once per interval.
const save = s.throttle(async (data: string) => {
  await api.save(data)
}, { interval: 1000 })

// Multiple calls - executes at most once per second
await save('data1')
await save('data2')  // Throttled
await save('data3')  // Throttled
fn
(...args: Args) => Promise<T>
required
Function to throttle
options
ThrottleOptions
throttled
(...args: Args) => Promise<Result<E, T>>
Throttled function that returns a Result tuple

poll()

Create a polling operation that executes a function at regular intervals.
const controller = s.poll(
  async (signal) => {
    const status = await api.getStatus()
    return status
  },
  (status) => {
    console.log('Status:', status)
  },
  { interval: 5000, immediate: true }
)

// Control polling
controller.stop()
controller.start()

const { running, pollCount } = controller.status()
fn
(signal: AbortSignal) => Promise<T>
required
Function to poll
onValue
(value: T) => void | Promise<void>
required
Callback for each polled value
options
PollOptions
controller
PollController
Poll controller with start(), stop(), and status() methods

Utilities

select()

Go-style select statement for channel operations.
const ch1 = s.channel<string>()
const ch2 = s.channel<number>()

const [err, result] = await s.select(
  new Map([
    [ch1, async (value) => `Got string: ${value}`],
    [ch2, async (value) => `Got number: ${value}`]
  ]),
  { timeout: 5000 }
)
cases
Map<Channel<T>, (value: T) => Promise<R>>
required
Map of channels to handler functions
options
SelectOptions
result
Result<Error, R>
Result of the first channel to receive a value

createChild()

Create a child scope that inherits from this scope.
const parent = scope({ name: 'parent' })
  .provide('config', config)

const child = parent.createChild({ 
  name: 'child',
  timeout: 5000 
})

// Child has access to parent's services
const t = child.task(async ({ services }) => {
  const config = services.config  // Inherited from parent
})
options
ScopeOptions
Child scope options (same as scope() options)
child
Scope<ParentServices & ChildServices>
New child scope

onDispose()

Register a cleanup callback to run when the scope is disposed.
s.onDispose(() => {
  console.log('Scope disposed')
  cleanup()
})
fn
() => void | Promise<void>
required
Cleanup function

onBeforeTask()

Register a callback to run before each task starts.
s.onBeforeTask((name, index, options) => {
  console.log(`Starting task ${name} (#${index})`)
})
fn
(name: string, index: number, options?: TaskOptions) => void
required
Callback function

onAfterTask()

Register a callback to run after each task completes.
s.onAfterTask((name, duration, error, index) => {
  if (error) {
    console.log(`Task ${name} failed after ${duration}ms`)
  } else {
    console.log(`Task ${name} succeeded in ${duration}ms`)
  }
})
fn
(name: string, duration: number, error?: unknown, index?: number) => void
required
Callback function

debugTree()

Generate a visual tree representation of the scope hierarchy.
const parent = scope({ name: 'root' })
const child = parent.createChild({ name: 'child' })
const grandchild = child.createChild({ name: 'grandchild' })

console.log(parent.debugTree())
// Output:
// 📦 root (id: 1)
//    ├─ 📦 child (id: 2)
//    │  └─ 📦 grandchild (id: 3)

console.log(parent.debugTree({ format: 'mermaid' }))
// Output: Mermaid diagram syntax
options
DebugTreeOptions
tree
string
Tree representation as a string

Properties

signal

The AbortSignal for this scope. Becomes aborted when the scope is disposed or times out.
const signal = s.signal
if (signal.aborted) {
  console.log('Scope is aborted')
}
signal
AbortSignal
The scope’s AbortSignal

isDisposed

Whether the scope has been disposed.
if (s.isDisposed) {
  console.log('Scope is disposed')
}
isDisposed
boolean
True if disposed

scopeName

The name of the scope.
console.log(s.scopeName)  // "my-scope"
scopeName
string
Scope name

concurrency

The concurrency limit for this scope, or undefined if unlimited.
const limit = s.concurrency  // 10 or undefined
concurrency
number | undefined
Concurrency limit

traceId

The trace ID for log correlation (if logCorrelation is enabled).
const s = scope({ logCorrelation: true })
console.log(s.traceId)  // "abc123..."
traceId
string | undefined
Trace ID for distributed tracing

spanId

The span ID for log correlation (if logCorrelation is enabled).
const s = scope({ logCorrelation: true })
console.log(s.spanId)  // "def456..."
spanId
string | undefined
Span ID for distributed tracing

Examples

Basic Usage

import { scope } from 'go-go-scope'

await using s = scope()

const t = s.task(async ({ signal }) => {
  // Do work...
  return 'result'
})

const [err, result] = await t
if (err) {
  console.error('Task failed:', err)
} else {
  console.log('Task succeeded:', result)
}
// Scope automatically disposed here

With Services

class Database {
  async query(sql: string) { /* ... */ }
}

class Logger {
  log(msg: string) { console.log(msg) }
}

await using s = scope()
  .provide('db', new Database())
  .provide('logger', new Logger())

const t = s.task(async ({ services }) => {
  const { db, logger } = services
  logger.log('Querying database')
  const result = await db.query('SELECT * FROM users')
  return result
})

With Concurrency Limit

await using s = scope({ concurrency: 3 })

// Only 3 tasks run concurrently
const results = await s.parallel([
  async () => fetchUser(1),
  async () => fetchUser(2),
  async () => fetchUser(3),
  async () => fetchUser(4),
  async () => fetchUser(5)
])

With Circuit Breaker

await using s = scope({
  circuitBreaker: {
    failureThreshold: 5,
    resetTimeout: 30000,
    onOpen: (failures) => {
      console.log(`Circuit opened after ${failures} failures`)
    }
  }
})

// All tasks protected by circuit breaker
for (let i = 0; i < 10; i++) {
  const [err, result] = await s.task(() => unreliableApi())
  if (err) {
    console.error('API call failed:', err)
  }
}

With Checkpointing

import { FileCheckpointProvider } from './checkpoint-provider'

await using s = scope({
  persistence: {
    checkpoint: new FileCheckpointProvider('./checkpoints')
  }
})

const [err, result] = await s.task(
  async ({ checkpoint, progress }) => {
    const items = await loadItems()
    let processed = checkpoint?.data?.processed ?? 0
    
    for (let i = processed; i < items.length; i++) {
      await processItem(items[i])
      progress.update((i / items.length) * 100)
      
      if (i % 100 === 0) {
        await checkpoint.save({ processed: i })
      }
    }
    
    return { total: items.length }
  },
  {
    id: 'batch-job',
    checkpoint: {
      interval: 60000,
      onCheckpoint: (cp) => console.log(`Saved checkpoint ${cp.sequence}`)
    }
  }
)

Parent-Child Scopes

await using parent = scope({ name: 'parent' })
  .provide('config', { apiUrl: 'https://api.example.com' })

// Create child scope
const child = parent.createChild({ 
  name: 'child',
  timeout: 5000 
})

// Child has access to parent services
const t = child.task(async ({ services }) => {
  const config = services.config
  // Use inherited config...
})

// When parent is disposed, child is also disposed

Build docs developers (and LLMs) love