Skip to main content

Overview

Channels provide Go-style concurrent communication between tasks. They support buffering, backpressure strategies, and automatic cleanup when the scope is disposed.
Channels implement AsyncIterable, so you can use for await...of to consume values.

Basic Channel Usage

Create a buffered channel and use it for producer-consumer communication:
import { scope } from 'go-go-scope';

await using s = scope();

// Create a buffered channel with capacity of 10
const ch = s.channel<string>(10);

// Producer task
s.task(async () => {
  await ch.send('message 1');
  await ch.send('message 2');
  await ch.send('message 3');
  ch.close(); // Signal no more messages
});

// Consumer task
s.task(async () => {
  for await (const msg of ch) {
    console.log('Received:', msg);
  }
  console.log('Channel closed');
});

Buffering

Buffered Channels

Buffered channels can hold multiple values before blocking:
// Create channel with buffer size 5
const ch = s.channel<number>(5);

// Send up to 5 items without blocking
await ch.send(1);
await ch.send(2);
await ch.send(3);
await ch.send(4);
await ch.send(5);

// 6th send will block until someone receives
await ch.send(6); // Blocks here

Unbuffered Channels

Unbuffered channels (capacity 0) require sender and receiver to rendezvous:
const ch = s.channel<string>(0);

// Sender and receiver must meet
s.task(async () => {
  await ch.send('hello'); // Blocks until received
});

s.task(async () => {
  const msg = await ch.receive(); // Blocks until sent
  console.log(msg);
});

Backpressure Strategies

Control what happens when the channel buffer is full:
// Block sender until space is available
const ch = s.channel<number>({
  capacity: 10,
  backpressure: 'block'
});

// Send blocks when buffer is full
for (let i = 0; i < 100; i++) {
  await ch.send(i); // Waits for consumer
}

Channel Operations

Send and Receive

const ch = s.channel<string>(10);

// Send a value (returns false if closed)
const sent = await ch.send('hello');

// Receive a value (returns undefined if closed and empty)
const msg = await ch.receive();
if (msg !== undefined) {
  console.log('Received:', msg);
}

// Close the channel
ch.close();

// Check if closed
if (ch.isClosed) {
  console.log('Channel is closed');
}

Transformations

Channels support functional transformations:
1

Map

Transform each value:
const numbers = s.channel<number>(10);
const doubled = numbers.map(x => x * 2);

await numbers.send(5);
const result = await doubled.receive(); // 10
2

Filter

Filter values based on a predicate:
const numbers = s.channel<number>(10);
const evens = numbers.filter(x => x % 2 === 0);

await numbers.send(1);
await numbers.send(2);
await numbers.send(3);

const first = await evens.receive(); // 2
const second = await evens.receive(); // undefined (1 and 3 filtered)
3

Reduce

Accumulate values:
const numbers = s.channel<number>(10);

const sumPromise = numbers.reduce((acc, x) => acc + x, 0);

await numbers.send(1);
await numbers.send(2);
await numbers.send(3);
numbers.close();

const sum = await sumPromise; // 6
4

Take

Limit the number of values:
const numbers = s.channel<number>(10);
const first3 = numbers.take(3);

for await (const n of first3) {
  console.log(n); // Stops after 3 values
}

Broadcast Channels

Broadcast channels send each message to ALL subscribers (pub/sub pattern):
await using s = scope();

const broadcast = s.broadcast<string>();

// Subscribe consumer 1
s.task(async () => {
  for await (const msg of broadcast.subscribe()) {
    console.log('Consumer 1:', msg);
  }
});

// Subscribe consumer 2
s.task(async () => {
  for await (const msg of broadcast.subscribe()) {
    console.log('Consumer 2:', msg);
  }
});

// Producer - all subscribers receive each message
s.task(async () => {
  await broadcast.send('hello');
  await broadcast.send('world');
  broadcast.close();
});

// Output:
// Consumer 1: hello
// Consumer 2: hello
// Consumer 1: world
// Consumer 2: world

Broadcast Properties

const broadcast = s.broadcast<string>();

// Get subscriber count
console.log(broadcast.subscriberCount); // 0

// Check if closed
if (broadcast.isClosed) {
  console.log('Broadcast is closed');
}

Select Statement

Go-style select to wait on multiple channels:
await using s = scope();

const ch1 = s.channel<string>(10);
const ch2 = s.channel<number>(10);

// Wait for first channel to receive a value
const [err, result] = await s.select(
  new Map([
    [ch1, async (msg) => `String: ${msg}`],
    [ch2, async (num) => `Number: ${num}`],
  ]),
  { timeout: 5000 }
);

if (!err) {
  console.log(result); // Result from whichever channel received first
}

Producer-Consumer Pattern

Classic pattern with multiple producers and consumers:
await using s = scope();

const jobs = s.channel<number>(100);
const results = s.channel<number>(100);

// Multiple producers
for (let i = 0; i < 3; i++) {
  s.task(async ({ signal }) => {
    for (let j = 0; j < 10; j++) {
      if (signal.aborted) break;
      await jobs.send(i * 10 + j);
    }
  });
}

// Multiple consumers
for (let i = 0; i < 5; i++) {
  s.task(async () => {
    for await (const job of jobs) {
      const result = await processJob(job);
      await results.send(result);
    }
  });
}

// Collector
s.task(async () => {
  for await (const result of results) {
    console.log('Result:', result);
  }
});

// Close jobs channel when all producers are done
Promise.all(producers).then(() => jobs.close());

Pipeline Pattern

Chain processing stages with channels:
await using s = scope();

// Stage 1: Generate numbers
const stage1 = s.channel<number>(10);
s.task(async () => {
  for (let i = 1; i <= 100; i++) {
    await stage1.send(i);
  }
  stage1.close();
});

// Stage 2: Square numbers
const stage2 = stage1.map(x => x * x);

// Stage 3: Filter even squares
const stage3 = stage2.filter(x => x % 2 === 0);

// Stage 4: Take first 5
const stage4 = stage3.take(5);

// Consumer
for await (const n of stage4) {
  console.log(n); // 4, 16, 36, 64, 100
}

Fan-Out / Fan-In

await using s = scope();

const input = s.channel<number>(10);

// Single producer
s.task(async () => {
  for (let i = 0; i < 100; i++) {
    await input.send(i);
  }
  input.close();
});

// Multiple consumers (fan-out)
const workers = 5;
for (let i = 0; i < workers; i++) {
  s.task(async ({ logger }) => {
    for await (const item of input) {
      logger.info(`Worker ${i} processing ${item}`);
      await process(item);
    }
  });
}
await using s = scope();

const output = s.channel<number>(100);

// Multiple producers (fan-in)
const producers = 5;
for (let i = 0; i < producers; i++) {
  s.task(async () => {
    for (let j = 0; j < 20; j++) {
      await output.send(i * 20 + j);
    }
  });
}

// Single consumer
s.task(async () => {
  for await (const item of output) {
    console.log('Received:', item);
  }
});

Error Handling

Channels integrate with scope cancellation:
await using s = scope({ timeout: 5000 });

const ch = s.channel<string>(10);

try {
  // If scope times out, send/receive are aborted
  await ch.send('message');
} catch (err) {
  console.error('Channel operation cancelled');
}

// Channels are automatically closed when scope is disposed

Best Practices

Close Producers

Always close channels when done sending to signal completion to consumers.

Use Buffering

Buffer channels to decouple producer and consumer speeds.

Handle Backpressure

Choose the right backpressure strategy for your use case.

Check Closed

Check isClosed before sending to avoid errors.

Next Steps

Parallel Execution

Run multiple tasks concurrently with channels

Streams

Use streams for lazy async iterable processing

Build docs developers (and LLMs) love