Skip to main content

Overview

The Channel class provides Go-style channels for concurrent communication between tasks. Channels support multiple producers and consumers with configurable backpressure strategies.
Channels are automatically closed when the parent scope is disposed. All pending send/receive operations are cancelled.

Creation

Channels are created using scope.channel():
import { scope } from 'go-go-scope'

await using s = scope()

// Unbuffered channel (capacity = 0)
const ch1 = s.channel<string>()

// Buffered channel (capacity = 10)
const ch2 = s.channel<number>(10)

// With backpressure options
const ch3 = s.channel<Message>({
  capacity: 100,
  backpressure: 'drop-oldest',
  onDrop: (msg) => console.log('Dropped:', msg)
})
capacityOrOptions
number | ChannelOptions<T>
Buffer capacity (number) or full options object
channel
Channel<T>
A new Channel instance

Backpressure Strategies

block (default)

Sender blocks until space is available in the buffer:
const ch = s.channel<string>(2)  // Capacity: 2

await ch.send('a')  // OK - buffer: ['a']
await ch.send('b')  // OK - buffer: ['a', 'b']

const sendPromise = ch.send('c')  // Blocks until space available

// After receiving 'a', 'c' can be sent
const a = await ch.receive()  // 'a'
await sendPromise  // Now completes - buffer: ['b', 'c']

drop-oldest

Remove oldest item to make room for new item:
const ch = s.channel<string>({
  capacity: 2,
  backpressure: 'drop-oldest',
  onDrop: (val) => console.log('Dropped:', val)
})

await ch.send('a')  // buffer: ['a']
await ch.send('b')  // buffer: ['a', 'b']
await ch.send('c')  // Drops 'a', buffer: ['b', 'c']

const first = await ch.receive()  // 'b' (not 'a'!)

drop-latest

Drop the new item when buffer is full:
const ch = s.channel<string>({
  capacity: 2,
  backpressure: 'drop-latest',
  onDrop: (val) => console.log('Dropped:', val)
})

await ch.send('a')  // buffer: ['a']
await ch.send('b')  // buffer: ['a', 'b']
await ch.send('c')  // Drops 'c', buffer: ['a', 'b']

const first = await ch.receive()  // 'a'
const second = await ch.receive()  // 'b'

error

Throw error when buffer is full:
const ch = s.channel<string>({
  capacity: 2,
  backpressure: 'error'
})

await ch.send('a')  // OK
await ch.send('b')  // OK

try {
  await ch.send('c')  // Throws ChannelFullError
} catch (err) {
  console.error('Buffer full:', err)
}

sample

Keep only values within a time window (most recent):
const ch = s.channel<Measurement>({
  capacity: 1000,
  backpressure: 'sample',
  sampleWindow: 5000,  // 5 second window
  onDrop: (val) => console.log('Dropped old value:', val)
})

// Send 1000 measurements over 10 seconds
for (let i = 0; i < 1000; i++) {
  await ch.send({ timestamp: Date.now(), value: i })
  await sleep(10)
}

// Only measurements from last 5 seconds are available
for await (const measurement of ch) {
  // These are all recent (< 5 seconds old)
}

Core Methods

send()

Send a value to the channel. Behavior depends on backpressure strategy.
const ch = s.channel<string>(10)

const success = await ch.send('hello')
if (!success) {
  console.log('Channel is closed')
}
value
T
required
Value to send
success
Promise<boolean>
True if sent successfully, false if channel is closed
Returns false if channel is closed. Throws if scope is aborted. Behavior when buffer is full depends on backpressure strategy.

receive()

Receive a value from the channel. Blocks if channel is empty.
const ch = s.channel<string>(10)

await ch.send('hello')

const value = await ch.receive()
console.log(value)  // 'hello'

// If channel is closed and empty
ch.close()
const value2 = await ch.receive()
console.log(value2)  // undefined
value
Promise<T | undefined>
Received value, or undefined if channel is closed and empty
Returns undefined if channel is closed and buffer is empty. Throws if scope is aborted.

close()

Close the channel. No more sends allowed. Consumers can still drain the buffer.
const ch = s.channel<string>(10)

await ch.send('a')
await ch.send('b')

ch.close()

// Sending after close returns false
const success = await ch.send('c')
console.log(success)  // false

// Receiving still works until buffer is drained
const a = await ch.receive()  // 'a'
const b = await ch.receive()  // 'b'
const c = await ch.receive()  // undefined (closed and empty)

map()

Transform each value using a mapping function. Returns a new channel.
const numbers = s.channel<number>(10)
const doubled = numbers.map(x => x * 2)

await numbers.send(5)
await numbers.send(10)
numbers.close()

const a = await doubled.receive()  // 10
const b = await doubled.receive()  // 20
fn
(value: T) => R
required
Mapping function
channel
Channel<R>
New channel with mapped values

filter()

Filter values based on a predicate. Returns a new channel.
const numbers = s.channel<number>(10)
const evens = numbers.filter(x => x % 2 === 0)

await numbers.send(1)
await numbers.send(2)
await numbers.send(3)
await numbers.send(4)
numbers.close()

const a = await evens.receive()  // 2
const b = await evens.receive()  // 4
predicate
(value: T) => boolean
required
Filter function
channel
Channel<T>
New channel with filtered values

reduce()

Reduce all values to a single value. Returns a promise.
const numbers = s.channel<number>(10)

const sumPromise = numbers.reduce((acc, x) => acc + x, 0)

await numbers.send(1)
await numbers.send(2)
await numbers.send(3)
numbers.close()

const sum = await sumPromise  // 6
fn
(accumulator: R, value: T) => R
required
Reducer function
initial
R
required
Initial accumulator value
result
Promise<R>
Final accumulated value

take()

Take only the first n values. Returns a new channel that closes after n values.
const numbers = s.channel<number>()
const first3 = numbers.take(3)

await numbers.send(1)
await numbers.send(2)
await numbers.send(3)
await numbers.send(4)  // Ignored by first3

for await (const n of first3) {
  console.log(n)  // 1, 2, 3
}
count
number
required
Number of values to take
channel
Channel<T>
New channel limited to n values

Async Iteration

Channels implement AsyncIterable, allowing use with for await...of:
const ch = s.channel<string>(10)

// Producer task
s.task(async () => {
  for (const item of ['a', 'b', 'c']) {
    await ch.send(item)
  }
  ch.close()
})

// Consumer task
s.task(async () => {
  for await (const item of ch) {
    console.log('Received:', item)
  }
  console.log('Channel closed')
})

Properties

isClosed

Whether the channel is closed.
const ch = s.channel<string>()

console.log(ch.isClosed)  // false

ch.close()

console.log(ch.isClosed)  // true
isClosed
boolean
True if channel is closed

size

Current number of items in the buffer.
const ch = s.channel<string>(10)

await ch.send('a')
await ch.send('b')

console.log(ch.size)  // 2

await ch.receive()

console.log(ch.size)  // 1
size
number
Number of buffered items

cap

Buffer capacity.
const ch = s.channel<string>(10)

console.log(ch.cap)  // 10
cap
number
Buffer capacity

strategy

Current backpressure strategy.
const ch = s.channel<string>({
  capacity: 10,
  backpressure: 'drop-oldest'
})

console.log(ch.strategy)  // 'drop-oldest'
strategy
BackpressureStrategy
Backpressure strategy

Patterns

Producer-Consumer

await using s = scope()

const ch = s.channel<number>(10)

// Producer
s.task(async () => {
  for (let i = 0; i < 100; i++) {
    await ch.send(i)
  }
  ch.close()
})

// Consumer
s.task(async () => {
  for await (const num of ch) {
    await processNumber(num)
  }
})

Multiple Producers

await using s = scope()

const ch = s.channel<Message>(100)

// Producer 1
s.task(async () => {
  for (const msg of source1) {
    await ch.send(msg)
  }
})

// Producer 2
s.task(async () => {
  for (const msg of source2) {
    await ch.send(msg)
  }
})

// Single consumer
s.task(async () => {
  for await (const msg of ch) {
    await handleMessage(msg)
  }
})

Multiple Consumers (Fan-out)

await using s = scope()

const ch = s.channel<Job>(100)

// Producer
s.task(async () => {
  for (const job of jobs) {
    await ch.send(job)
  }
  ch.close()
})

// Consumer 1
s.task(async () => {
  for await (const job of ch) {
    await processJob(job)
  }
})

// Consumer 2
s.task(async () => {
  for await (const job of ch) {
    await processJob(job)
  }
})

// Consumer 3
s.task(async () => {
  for await (const job of ch) {
    await processJob(job)
  }
})

Pipeline

await using s = scope()

const input = s.channel<string>(10)
const processed = input
  .map(x => x.toUpperCase())
  .filter(x => x.length > 3)
  .map(x => ({ text: x, timestamp: Date.now() }))

// Producer
s.task(async () => {
  for (const item of items) {
    await input.send(item)
  }
  input.close()
})

// Consumer
s.task(async () => {
  for await (const item of processed) {
    await saveToDatabase(item)
  }
})

Rate-Limited Channel

await using s = scope()

const ch = s.channel<Request>({
  capacity: 100,
  backpressure: 'drop-latest',
  onDrop: (req) => {
    console.warn('Rate limit exceeded, dropped request:', req.id)
  }
})

// Fast producer
s.task(async () => {
  for (const req of requests) {
    await ch.send(req)  // May drop if consumer is slow
  }
})

// Slow consumer with rate limit
s.task(async () => {
  const rateLimiter = s.semaphore(10)  // Max 10 concurrent
  
  for await (const req of ch) {
    await rateLimiter.acquire(async () => {
      await processRequest(req)
    })
  }
})

Buffered Stream Processing

await using s = scope()

const ch = s.channel<Event>(1000)

// High-throughput event source
s.task(async () => {
  eventSource.on('data', async (event) => {
    await ch.send(event)
  })
  
  eventSource.on('end', () => {
    ch.close()
  })
})

// Batch processor
s.task(async () => {
  const batch: Event[] = []
  
  for await (const event of ch) {
    batch.push(event)
    
    if (batch.length >= 100) {
      await processBatch(batch)
      batch.length = 0
    }
  }
  
  // Process remaining
  if (batch.length > 0) {
    await processBatch(batch)
  }
})

Time-Sampled Data

await using s = scope()

const measurements = s.channel<Measurement>({
  capacity: 10000,
  backpressure: 'sample',
  sampleWindow: 60000,  // 1 minute window
  onDrop: (m) => console.log('Dropped old measurement:', m)
})

// High-frequency sensor
s.task(async () => {
  while (true) {
    const measurement = await readSensor()
    await measurements.send(measurement)
    await sleep(10)  // 100 readings/second
  }
})

// Low-frequency consumer (1 reading/second)
s.task(async () => {
  for await (const m of measurements) {
    await saveMeasurement(m)
    await sleep(1000)
  }
})

BroadcastChannel

For pub/sub patterns where all subscribers receive every message, use BroadcastChannel:
const bc = s.broadcast<string>()

// Unlike Channel, all subscribers get all messages
s.task(async () => {
  for await (const msg of bc.subscribe()) {
    console.log('Consumer 1:', msg)
  }
})

s.task(async () => {
  for await (const msg of bc.subscribe()) {
    console.log('Consumer 2:', msg)
  }
})

await bc.send('hello')  // Both consumers receive "hello"
See BroadcastChannel for details.

Examples

Request Queue

import { scope } from 'go-go-scope'

interface Request {
  id: string
  data: unknown
}

await using s = scope()

const requests = s.channel<Request>(100)

// API endpoint adds requests to channel
app.post('/api/process', async (req, res) => {
  const request = { id: generateId(), data: req.body }
  const sent = await requests.send(request)
  
  if (!sent) {
    return res.status(503).send('Service unavailable')
  }
  
  res.json({ id: request.id })
})

// Background worker processes requests
s.task(async () => {
  for await (const req of requests) {
    try {
      await processRequest(req)
    } catch (err) {
      console.error('Failed to process request:', req.id, err)
    }
  }
})

Data Pipeline

import { scope } from 'go-go-scope'

await using s = scope()

// Stage 1: Fetch data
const raw = s.channel<RawData>(100)
s.task(async () => {
  for (const id of ids) {
    const data = await fetchData(id)
    await raw.send(data)
  }
  raw.close()
})

// Stage 2: Transform
const transformed = raw.map(data => transform(data))

// Stage 3: Validate
const validated = transformed.filter(data => validate(data))

// Stage 4: Enrich
const enriched = s.channel<EnrichedData>(100)
s.task(async () => {
  for await (const data of validated) {
    const enrichedData = await enrich(data)
    await enriched.send(enrichedData)
  }
  enriched.close()
})

// Stage 5: Save
s.task(async () => {
  for await (const data of enriched) {
    await saveToDatabase(data)
  }
})

Build docs developers (and LLMs) love