Overview
The Task class represents a lazy, disposable asynchronous operation. Tasks are created by calling scope.task() and implement both PromiseLike (for await support) and Disposable (for using keyword support).
Tasks are lazy - they don’t start executing until you await them or call .then(). This allows you to compose and configure tasks before execution.
Creation
Tasks are created using the task() method on a Scope:
import { scope } from 'go-go-scope'
await using s = scope()
const t = s.task(async ({ signal }) => {
// Task implementation
return 'result'
})
// Task hasn't started yet - it's lazy
// Start the task by awaiting
const [err, result] = await t
Lifecycle
Lazy Execution
Tasks don’t start executing until awaited:
const t = s.task(async () => {
console.log('Task started')
return 'result'
})
console.log('Task created')
// Output: "Task created"
// Task hasn't started yet!
await t
// Output: "Task started"
Automatic Cleanup
Tasks automatically clean up when:
- The task completes (success or failure)
- The parent scope is disposed
- The task is disposed via
using
{
using t = s.task(async ({ signal }) => {
// Setup abort listener
signal.addEventListener('abort', () => {
console.log('Task aborted')
})
return await longOperation()
})
// Task is disposed here, abort handler removed
}
Cancellation Propagation
Tasks inherit cancellation from their parent scope:
await using s = scope({ timeout: 1000 })
const t = s.task(async ({ signal }) => {
// Check if cancelled
if (signal.aborted) {
throw new Error('Cancelled')
}
// Listen for cancellation
signal.addEventListener('abort', () => {
console.log('Task was cancelled')
})
await longOperation()
})
// After 1000ms, scope aborts and task is cancelled
Task Context
Tasks receive a context object with useful utilities:
const t = s.task(async (ctx) => {
// Available context properties:
const {
services, // Services from scope.provide()
signal, // AbortSignal for cancellation
logger, // Structured logger
context, // Custom context from scope options
checkpoint, // Checkpoint utilities (if configured)
progress // Progress tracking (if configured)
} = ctx
// Use context...
})
services
Services provided via scope.provide():
const s = scope()
.provide('database', new Database())
.provide('cache', new Cache())
const t = s.task(async ({ services }) => {
const db = services.database
const cache = services.cache
const cached = await cache.get('users')
if (cached) return cached
const users = await db.query('SELECT * FROM users')
await cache.set('users', users)
return users
})
signal
AbortSignal for detecting cancellation:
const t = s.task(async ({ signal }) => {
// Check before long operations
if (signal.aborted) {
throw new Error('Cancelled before start')
}
// Pass to fetch/axios for automatic cancellation
const response = await fetch('/api/data', { signal })
// Listen for cancellation during processing
signal.addEventListener('abort', () => {
console.log('Cancelled during processing')
})
return response.json()
})
logger
Structured logger with task context:
const t = s.task(async ({ logger }) => {
logger.debug('Task started')
logger.info('Processing item', { itemId: 123 })
logger.warn('Cache miss', { key: 'users' })
logger.error('Operation failed', { error: err })
})
context
Custom context object from scope options:
const s = scope({
context: {
userId: '123',
tenantId: 'acme'
}
})
const t = s.task(async ({ context }) => {
const userId = context.userId
const tenantId = context.tenantId
// Use context values...
})
checkpoint
Checkpoint utilities for long-running tasks (requires checkpoint provider):
const t = s.task(
async ({ checkpoint, signal }) => {
// Resume from checkpoint if available
let processed = checkpoint?.data?.processed ?? 0
for (let i = processed; i < items.length; i++) {
if (signal.aborted) break
await processItem(items[i])
// Save checkpoint periodically
if (i % 100 === 0) {
await checkpoint.save({ processed: i })
}
}
return { total: items.length }
},
{
id: 'batch-job',
checkpoint: {
interval: 60000, // Auto-save every 60s
onCheckpoint: (cp) => console.log('Checkpoint saved:', cp.sequence)
}
}
)
progress
Progress tracking utilities (requires checkpoint provider):
const t = s.task(
async ({ progress }) => {
const items = await loadItems()
for (let i = 0; i < items.length; i++) {
await processItem(items[i])
// Update progress
progress.update((i / items.length) * 100)
}
return { total: items.length }
},
{ id: 'batch-job' }
)
// Subscribe to progress updates
const unsubscribe = t.progress?.onUpdate((p) => {
console.log(`Progress: ${p.percentage}%`)
if (p.eta) {
console.log(`ETA: ${p.eta}ms`)
}
})
Task Options
Tasks can be configured with various options:
timeout
Set a timeout for the task:
const t = s.task(
async () => {
await longOperation()
},
{ timeout: 5000 } // 5 second timeout
)
const [err, result] = await t
if (err?.message.includes('timeout')) {
console.log('Task timed out')
}
retry
Automatically retry failed tasks:
import { exponentialBackoff } from 'go-go-scope'
const t = s.task(
async () => {
return await unreliableApi()
},
{
retry: {
maxRetries: 3,
delay: exponentialBackoff({ initial: 100, max: 5000, jitter: 0.3 }),
retryCondition: (err) => {
// Only retry on network errors
return err.message.includes('network')
},
onRetry: (err, attempt) => {
console.log(`Retry attempt ${attempt} after error:`, err)
}
}
}
)
// Or use preset strategies
const t2 = s.task(
async () => unreliableApi(),
{ retry: 'exponential' } // Uses default exponential backoff
)
errorClass
Wrap all errors in a custom error class:
import { taggedError } from 'go-go-try'
const DatabaseError = taggedError('DatabaseError')
const t = s.task(
async () => {
return await db.query('SELECT * FROM users')
},
{ errorClass: DatabaseError }
)
const [err, result] = await t
if (err instanceof DatabaseError) {
// Type-safe error handling
console.error('Database error:', err)
}
systemErrorClass
Wrap only system errors (preserves tagged errors):
import { taggedError } from 'go-go-try'
const NotFoundError = taggedError('NotFoundError')
const DatabaseError = taggedError('DatabaseError')
const t = s.task(
async () => {
const user = await db.query('SELECT * FROM users WHERE id = ?', [id])
if (!user) {
throw new NotFoundError('User not found') // Preserved!
}
return user
},
{ systemErrorClass: DatabaseError } // Only wraps connection errors
)
const [err, user] = await t
if (err instanceof NotFoundError) {
return { status: 404 } // Business error
} else if (err) {
return { status: 500 } // System error
}
dedupe
Deduplicate concurrent tasks with the same key:
// These tasks share the same result
const t1 = s.task(() => fetchUser(1), { dedupe: 'user:1' })
const t2 = s.task(() => fetchUser(1), { dedupe: 'user:1' })
const [r1, r2] = await Promise.all([t1, t2])
// Only one API call was made, both got the same result
memo
Cache successful results for a TTL:
// First call executes and caches
const r1 = await s.task(
() => fetchUser(1),
{ memo: { key: 'user:1', ttl: 60000 } }
)
// Within 60 seconds, returns cached result
const r2 = await s.task(
() => fetchUser(1),
{ memo: { key: 'user:1', ttl: 60000 } }
)
// No API call made
idempotency
Persist results across executions (requires persistence provider):
const s = scope({
persistence: {
idempotency: new RedisIdempotencyProvider(redis)
}
})
const [err, result] = await s.task(
() => processPayment(orderId),
{
idempotency: {
key: `payment:${orderId}`,
ttl: 3600000 // 1 hour
}
}
)
// If called again within 1 hour, returns cached result
worker
Execute task in a worker thread for CPU-intensive operations:
const [err, result] = await s.task(
() => {
// This runs in a worker thread
let sum = 0
for (let i = 0; i < 10000000; i++) {
sum += Math.sqrt(i)
}
return sum
},
{ worker: true }
)
checkpoint
Configure checkpointing for long-running tasks:
const [err, result] = await s.task(
async ({ checkpoint, progress }) => {
const items = await loadItems()
let processed = checkpoint?.data?.processed ?? 0
for (let i = processed; i < items.length; i++) {
await processItem(items[i])
progress.update((i / items.length) * 100)
if (i % 100 === 0) {
await checkpoint.save({ processed: i })
}
}
return { total: items.length }
},
{
id: 'batch-job',
checkpoint: {
interval: 60000, // Auto-checkpoint every minute
maxCheckpoints: 10,
onCheckpoint: (cp) => console.log(`Checkpoint ${cp.sequence} saved`),
onResume: (cp) => console.log(`Resumed from checkpoint ${cp.sequence}`)
}
}
)
otel
OpenTelemetry tracing configuration:
const t = s.task(
async () => {
return await fetchData()
},
{
otel: {
name: 'fetchData',
attributes: {
userId: '123',
tenantId: 'acme'
}
}
}
)
Methods
then()
Standard Promise-like .then() method. Starts task execution.
const t = s.task(async () => 'result')
t.then(
([err, result]) => {
if (err) {
console.error('Failed:', err)
} else {
console.log('Success:', result)
}
}
)
onfulfilled
(value: Result<E, T>) => TResult1 | PromiseLike<TResult1>
Success callback
onrejected
(reason: unknown) => TResult2 | PromiseLike<TResult2>
Rejection callback
promise
Promise<TResult1 | TResult2>
Promise that resolves with the callback result
catch()
Catch errors during task execution.
const t = s.task(async () => {
throw new Error('Task failed')
})
await t.catch((err) => {
console.error('Caught error:', err)
return [undefined, 'fallback'] as const
})
onrejected
(reason: unknown) => TResult | PromiseLike<TResult>
Error callback
promise
Promise<Result<E, T> | TResult>
Promise that resolves with the task result or callback result
finally()
Execute cleanup code after task completes.
const t = s.task(async () => {
return await fetchData()
})
await t.finally(() => {
console.log('Task finished')
})
Promise that resolves with the task result
Properties
Unique task identifier.
const t = s.task(async () => 'result')
console.log(t.id) // 1, 2, 3, ...
signal
AbortSignal for this task. Lazy - only created when accessed.
const t = s.task(async () => 'result')
const signal = t.signal // AbortSignal created here
if (signal.aborted) {
console.log('Task is aborted')
}
isStarted
Whether the task has started executing.
const t = s.task(async () => 'result')
console.log(t.isStarted) // false
await t
console.log(t.isStarted) // true
isSettled
Whether the task has completed (success or failure).
const t = s.task(async () => 'result')
console.log(t.isSettled) // false
await t
console.log(t.isSettled) // true
True if task has completed
Disposal
Tasks implement the Disposable protocol:
// Manual disposal
const t = s.task(async ({ signal }) => {
signal.addEventListener('abort', () => {
console.log('Task disposed')
})
return await longOperation()
})
t[Symbol.dispose]() // Clean up immediately
// Using 'using' keyword
{
using t = s.task(async () => 'result')
// Task is automatically disposed at end of block
}
Examples
Basic Task
import { scope } from 'go-go-scope'
await using s = scope()
const t = s.task(async ({ signal }) => {
const response = await fetch('/api/data', { signal })
return response.json()
})
const [err, data] = await t
if (err) {
console.error('Failed to fetch data:', err)
} else {
console.log('Data:', data)
}
Task with Retry
import { scope, exponentialBackoff } from 'go-go-scope'
await using s = scope()
const t = s.task(
async () => {
return await unreliableApi()
},
{
retry: {
maxRetries: 5,
delay: exponentialBackoff({ initial: 100, max: 10000, jitter: 0.3 }),
onRetry: (err, attempt) => {
console.log(`Retry ${attempt}/5 after error:`, err)
}
}
}
)
const [err, result] = await t
Task with Services
class ApiClient {
async get(url: string) { /* ... */ }
}
await using s = scope()
.provide('api', new ApiClient())
const t = s.task(async ({ services }) => {
const api = services.api
return await api.get('/users')
})
Task with Checkpointing
import { scope } from 'go-go-scope'
import { FileCheckpointProvider } from './providers'
await using s = scope({
persistence: {
checkpoint: new FileCheckpointProvider('./checkpoints')
}
})
const t = s.task(
async ({ checkpoint, progress }) => {
const items = await loadLargeDataset()
let processed = checkpoint?.data?.processed ?? 0
for (let i = processed; i < items.length; i++) {
await processItem(items[i])
progress.update((i / items.length) * 100)
if (i % 1000 === 0) {
await checkpoint.save({ processed: i })
}
}
return { total: items.length }
},
{
id: 'batch-processing',
checkpoint: {
interval: 300000, // Auto-checkpoint every 5 minutes
onCheckpoint: (cp) => console.log(`Saved checkpoint ${cp.sequence}`)
}
}
)
const [err, result] = await t
Parallel Tasks
await using s = scope()
const tasks = [
s.task(() => fetchUser(1)),
s.task(() => fetchUser(2)),
s.task(() => fetchUser(3))
]
const results = await Promise.all(tasks)
for (const [err, user] of results) {
if (err) {
console.error('Failed to fetch user:', err)
} else {
console.log('User:', user)
}
}
Task Cancellation
await using s = scope({ timeout: 5000 })
const t = s.task(async ({ signal }) => {
// Abort listener
signal.addEventListener('abort', () => {
console.log('Task cancelled:', signal.reason)
})
// Long operation
for (let i = 0; i < 100; i++) {
if (signal.aborted) {
throw new Error('Cancelled')
}
await sleep(100)
}
return 'completed'
})
try {
const [err, result] = await t
} catch (err) {
console.error('Task error:', err)
}