Skip to main content

Overview

The Task class represents a lazy, disposable asynchronous operation. Tasks are created by calling scope.task() and implement both PromiseLike (for await support) and Disposable (for using keyword support).
Tasks are lazy - they don’t start executing until you await them or call .then(). This allows you to compose and configure tasks before execution.

Creation

Tasks are created using the task() method on a Scope:
import { scope } from 'go-go-scope'

await using s = scope()

const t = s.task(async ({ signal }) => {
  // Task implementation
  return 'result'
})

// Task hasn't started yet - it's lazy

// Start the task by awaiting
const [err, result] = await t

Lifecycle

Lazy Execution

Tasks don’t start executing until awaited:
const t = s.task(async () => {
  console.log('Task started')
  return 'result'
})

console.log('Task created')
// Output: "Task created"
// Task hasn't started yet!

await t
// Output: "Task started"

Automatic Cleanup

Tasks automatically clean up when:
  • The task completes (success or failure)
  • The parent scope is disposed
  • The task is disposed via using
{
  using t = s.task(async ({ signal }) => {
    // Setup abort listener
    signal.addEventListener('abort', () => {
      console.log('Task aborted')
    })
    
    return await longOperation()
  })
  
  // Task is disposed here, abort handler removed
}

Cancellation Propagation

Tasks inherit cancellation from their parent scope:
await using s = scope({ timeout: 1000 })

const t = s.task(async ({ signal }) => {
  // Check if cancelled
  if (signal.aborted) {
    throw new Error('Cancelled')
  }
  
  // Listen for cancellation
  signal.addEventListener('abort', () => {
    console.log('Task was cancelled')
  })
  
  await longOperation()
})

// After 1000ms, scope aborts and task is cancelled

Task Context

Tasks receive a context object with useful utilities:
const t = s.task(async (ctx) => {
  // Available context properties:
  const {
    services,    // Services from scope.provide()
    signal,      // AbortSignal for cancellation
    logger,      // Structured logger
    context,     // Custom context from scope options
    checkpoint,  // Checkpoint utilities (if configured)
    progress     // Progress tracking (if configured)
  } = ctx
  
  // Use context...
})

services

Services provided via scope.provide():
const s = scope()
  .provide('database', new Database())
  .provide('cache', new Cache())

const t = s.task(async ({ services }) => {
  const db = services.database
  const cache = services.cache
  
  const cached = await cache.get('users')
  if (cached) return cached
  
  const users = await db.query('SELECT * FROM users')
  await cache.set('users', users)
  return users
})

signal

AbortSignal for detecting cancellation:
const t = s.task(async ({ signal }) => {
  // Check before long operations
  if (signal.aborted) {
    throw new Error('Cancelled before start')
  }
  
  // Pass to fetch/axios for automatic cancellation
  const response = await fetch('/api/data', { signal })
  
  // Listen for cancellation during processing
  signal.addEventListener('abort', () => {
    console.log('Cancelled during processing')
  })
  
  return response.json()
})

logger

Structured logger with task context:
const t = s.task(async ({ logger }) => {
  logger.debug('Task started')
  logger.info('Processing item', { itemId: 123 })
  logger.warn('Cache miss', { key: 'users' })
  logger.error('Operation failed', { error: err })
})

context

Custom context object from scope options:
const s = scope({
  context: {
    userId: '123',
    tenantId: 'acme'
  }
})

const t = s.task(async ({ context }) => {
  const userId = context.userId
  const tenantId = context.tenantId
  // Use context values...
})

checkpoint

Checkpoint utilities for long-running tasks (requires checkpoint provider):
const t = s.task(
  async ({ checkpoint, signal }) => {
    // Resume from checkpoint if available
    let processed = checkpoint?.data?.processed ?? 0
    
    for (let i = processed; i < items.length; i++) {
      if (signal.aborted) break
      
      await processItem(items[i])
      
      // Save checkpoint periodically
      if (i % 100 === 0) {
        await checkpoint.save({ processed: i })
      }
    }
    
    return { total: items.length }
  },
  {
    id: 'batch-job',
    checkpoint: {
      interval: 60000,  // Auto-save every 60s
      onCheckpoint: (cp) => console.log('Checkpoint saved:', cp.sequence)
    }
  }
)

progress

Progress tracking utilities (requires checkpoint provider):
const t = s.task(
  async ({ progress }) => {
    const items = await loadItems()
    
    for (let i = 0; i < items.length; i++) {
      await processItem(items[i])
      
      // Update progress
      progress.update((i / items.length) * 100)
    }
    
    return { total: items.length }
  },
  { id: 'batch-job' }
)

// Subscribe to progress updates
const unsubscribe = t.progress?.onUpdate((p) => {
  console.log(`Progress: ${p.percentage}%`)
  if (p.eta) {
    console.log(`ETA: ${p.eta}ms`)
  }
})

Task Options

Tasks can be configured with various options:

timeout

Set a timeout for the task:
const t = s.task(
  async () => {
    await longOperation()
  },
  { timeout: 5000 }  // 5 second timeout
)

const [err, result] = await t
if (err?.message.includes('timeout')) {
  console.log('Task timed out')
}

retry

Automatically retry failed tasks:
import { exponentialBackoff } from 'go-go-scope'

const t = s.task(
  async () => {
    return await unreliableApi()
  },
  {
    retry: {
      maxRetries: 3,
      delay: exponentialBackoff({ initial: 100, max: 5000, jitter: 0.3 }),
      retryCondition: (err) => {
        // Only retry on network errors
        return err.message.includes('network')
      },
      onRetry: (err, attempt) => {
        console.log(`Retry attempt ${attempt} after error:`, err)
      }
    }
  }
)

// Or use preset strategies
const t2 = s.task(
  async () => unreliableApi(),
  { retry: 'exponential' }  // Uses default exponential backoff
)

errorClass

Wrap all errors in a custom error class:
import { taggedError } from 'go-go-try'

const DatabaseError = taggedError('DatabaseError')

const t = s.task(
  async () => {
    return await db.query('SELECT * FROM users')
  },
  { errorClass: DatabaseError }
)

const [err, result] = await t
if (err instanceof DatabaseError) {
  // Type-safe error handling
  console.error('Database error:', err)
}

systemErrorClass

Wrap only system errors (preserves tagged errors):
import { taggedError } from 'go-go-try'

const NotFoundError = taggedError('NotFoundError')
const DatabaseError = taggedError('DatabaseError')

const t = s.task(
  async () => {
    const user = await db.query('SELECT * FROM users WHERE id = ?', [id])
    if (!user) {
      throw new NotFoundError('User not found')  // Preserved!
    }
    return user
  },
  { systemErrorClass: DatabaseError }  // Only wraps connection errors
)

const [err, user] = await t
if (err instanceof NotFoundError) {
  return { status: 404 }  // Business error
} else if (err) {
  return { status: 500 }  // System error
}

dedupe

Deduplicate concurrent tasks with the same key:
// These tasks share the same result
const t1 = s.task(() => fetchUser(1), { dedupe: 'user:1' })
const t2 = s.task(() => fetchUser(1), { dedupe: 'user:1' })

const [r1, r2] = await Promise.all([t1, t2])
// Only one API call was made, both got the same result

memo

Cache successful results for a TTL:
// First call executes and caches
const r1 = await s.task(
  () => fetchUser(1),
  { memo: { key: 'user:1', ttl: 60000 } }
)

// Within 60 seconds, returns cached result
const r2 = await s.task(
  () => fetchUser(1),
  { memo: { key: 'user:1', ttl: 60000 } }
)
// No API call made

idempotency

Persist results across executions (requires persistence provider):
const s = scope({
  persistence: {
    idempotency: new RedisIdempotencyProvider(redis)
  }
})

const [err, result] = await s.task(
  () => processPayment(orderId),
  {
    idempotency: {
      key: `payment:${orderId}`,
      ttl: 3600000  // 1 hour
    }
  }
)
// If called again within 1 hour, returns cached result

worker

Execute task in a worker thread for CPU-intensive operations:
const [err, result] = await s.task(
  () => {
    // This runs in a worker thread
    let sum = 0
    for (let i = 0; i < 10000000; i++) {
      sum += Math.sqrt(i)
    }
    return sum
  },
  { worker: true }
)

checkpoint

Configure checkpointing for long-running tasks:
const [err, result] = await s.task(
  async ({ checkpoint, progress }) => {
    const items = await loadItems()
    let processed = checkpoint?.data?.processed ?? 0
    
    for (let i = processed; i < items.length; i++) {
      await processItem(items[i])
      progress.update((i / items.length) * 100)
      
      if (i % 100 === 0) {
        await checkpoint.save({ processed: i })
      }
    }
    
    return { total: items.length }
  },
  {
    id: 'batch-job',
    checkpoint: {
      interval: 60000,  // Auto-checkpoint every minute
      maxCheckpoints: 10,
      onCheckpoint: (cp) => console.log(`Checkpoint ${cp.sequence} saved`),
      onResume: (cp) => console.log(`Resumed from checkpoint ${cp.sequence}`)
    }
  }
)

otel

OpenTelemetry tracing configuration:
const t = s.task(
  async () => {
    return await fetchData()
  },
  {
    otel: {
      name: 'fetchData',
      attributes: {
        userId: '123',
        tenantId: 'acme'
      }
    }
  }
)

Methods

then()

Standard Promise-like .then() method. Starts task execution.
const t = s.task(async () => 'result')

t.then(
  ([err, result]) => {
    if (err) {
      console.error('Failed:', err)
    } else {
      console.log('Success:', result)
    }
  }
)
onfulfilled
(value: Result<E, T>) => TResult1 | PromiseLike<TResult1>
Success callback
onrejected
(reason: unknown) => TResult2 | PromiseLike<TResult2>
Rejection callback
promise
Promise<TResult1 | TResult2>
Promise that resolves with the callback result

catch()

Catch errors during task execution.
const t = s.task(async () => {
  throw new Error('Task failed')
})

await t.catch((err) => {
  console.error('Caught error:', err)
  return [undefined, 'fallback'] as const
})
onrejected
(reason: unknown) => TResult | PromiseLike<TResult>
Error callback
promise
Promise<Result<E, T> | TResult>
Promise that resolves with the task result or callback result

finally()

Execute cleanup code after task completes.
const t = s.task(async () => {
  return await fetchData()
})

await t.finally(() => {
  console.log('Task finished')
})
onfinally
() => void
Cleanup callback
promise
Promise<Result<E, T>>
Promise that resolves with the task result

Properties

id

Unique task identifier.
const t = s.task(async () => 'result')
console.log(t.id)  // 1, 2, 3, ...
id
number
Unique task ID

signal

AbortSignal for this task. Lazy - only created when accessed.
const t = s.task(async () => 'result')
const signal = t.signal  // AbortSignal created here

if (signal.aborted) {
  console.log('Task is aborted')
}
signal
AbortSignal
Task’s AbortSignal

isStarted

Whether the task has started executing.
const t = s.task(async () => 'result')

console.log(t.isStarted)  // false

await t

console.log(t.isStarted)  // true
isStarted
boolean
True if task has started

isSettled

Whether the task has completed (success or failure).
const t = s.task(async () => 'result')

console.log(t.isSettled)  // false

await t

console.log(t.isSettled)  // true
isSettled
boolean
True if task has completed

Disposal

Tasks implement the Disposable protocol:
// Manual disposal
const t = s.task(async ({ signal }) => {
  signal.addEventListener('abort', () => {
    console.log('Task disposed')
  })
  
  return await longOperation()
})

t[Symbol.dispose]()  // Clean up immediately

// Using 'using' keyword
{
  using t = s.task(async () => 'result')
  
  // Task is automatically disposed at end of block
}

Examples

Basic Task

import { scope } from 'go-go-scope'

await using s = scope()

const t = s.task(async ({ signal }) => {
  const response = await fetch('/api/data', { signal })
  return response.json()
})

const [err, data] = await t
if (err) {
  console.error('Failed to fetch data:', err)
} else {
  console.log('Data:', data)
}

Task with Retry

import { scope, exponentialBackoff } from 'go-go-scope'

await using s = scope()

const t = s.task(
  async () => {
    return await unreliableApi()
  },
  {
    retry: {
      maxRetries: 5,
      delay: exponentialBackoff({ initial: 100, max: 10000, jitter: 0.3 }),
      onRetry: (err, attempt) => {
        console.log(`Retry ${attempt}/5 after error:`, err)
      }
    }
  }
)

const [err, result] = await t

Task with Services

class ApiClient {
  async get(url: string) { /* ... */ }
}

await using s = scope()
  .provide('api', new ApiClient())

const t = s.task(async ({ services }) => {
  const api = services.api
  return await api.get('/users')
})

Task with Checkpointing

import { scope } from 'go-go-scope'
import { FileCheckpointProvider } from './providers'

await using s = scope({
  persistence: {
    checkpoint: new FileCheckpointProvider('./checkpoints')
  }
})

const t = s.task(
  async ({ checkpoint, progress }) => {
    const items = await loadLargeDataset()
    let processed = checkpoint?.data?.processed ?? 0
    
    for (let i = processed; i < items.length; i++) {
      await processItem(items[i])
      progress.update((i / items.length) * 100)
      
      if (i % 1000 === 0) {
        await checkpoint.save({ processed: i })
      }
    }
    
    return { total: items.length }
  },
  {
    id: 'batch-processing',
    checkpoint: {
      interval: 300000,  // Auto-checkpoint every 5 minutes
      onCheckpoint: (cp) => console.log(`Saved checkpoint ${cp.sequence}`)
    }
  }
)

const [err, result] = await t

Parallel Tasks

await using s = scope()

const tasks = [
  s.task(() => fetchUser(1)),
  s.task(() => fetchUser(2)),
  s.task(() => fetchUser(3))
]

const results = await Promise.all(tasks)

for (const [err, user] of results) {
  if (err) {
    console.error('Failed to fetch user:', err)
  } else {
    console.log('User:', user)
  }
}

Task Cancellation

await using s = scope({ timeout: 5000 })

const t = s.task(async ({ signal }) => {
  // Abort listener
  signal.addEventListener('abort', () => {
    console.log('Task cancelled:', signal.reason)
  })
  
  // Long operation
  for (let i = 0; i < 100; i++) {
    if (signal.aborted) {
      throw new Error('Cancelled')
    }
    await sleep(100)
  }
  
  return 'completed'
})

try {
  const [err, result] = await t
} catch (err) {
  console.error('Task error:', err)
}

Build docs developers (and LLMs) love