Overview
The Channel class provides Go-style channels for concurrent communication between tasks. Channels support multiple producers and consumers with configurable backpressure strategies.
Channels are automatically closed when the parent scope is disposed. All pending send/receive operations are cancelled.
Creation
Channels are created using scope.channel():
import { scope } from 'go-go-scope'
await using s = scope ()
// Unbuffered channel (capacity = 0)
const ch1 = s . channel < string >()
// Buffered channel (capacity = 10)
const ch2 = s . channel < number >( 10 )
// With backpressure options
const ch3 = s . channel < Message >({
capacity: 100 ,
backpressure: 'drop-oldest' ,
onDrop : ( msg ) => console . log ( 'Dropped:' , msg )
})
capacityOrOptions
number | ChannelOptions<T>
Buffer capacity (number) or full options object Show ChannelOptions properties
Buffer capacity. Default: 0 (unbuffered)
Backpressure strategy: ‘block’, ‘drop-oldest’, ‘drop-latest’, ‘error’, ‘sample’. Default: ‘block’
Callback invoked when a value is dropped due to backpressure
Time window in milliseconds for ‘sample’ strategy. Default: 1000
Backpressure Strategies
block (default)
Sender blocks until space is available in the buffer:
const ch = s . channel < string >( 2 ) // Capacity: 2
await ch . send ( 'a' ) // OK - buffer: ['a']
await ch . send ( 'b' ) // OK - buffer: ['a', 'b']
const sendPromise = ch . send ( 'c' ) // Blocks until space available
// After receiving 'a', 'c' can be sent
const a = await ch . receive () // 'a'
await sendPromise // Now completes - buffer: ['b', 'c']
drop-oldest
Remove oldest item to make room for new item:
const ch = s . channel < string >({
capacity: 2 ,
backpressure: 'drop-oldest' ,
onDrop : ( val ) => console . log ( 'Dropped:' , val )
})
await ch . send ( 'a' ) // buffer: ['a']
await ch . send ( 'b' ) // buffer: ['a', 'b']
await ch . send ( 'c' ) // Drops 'a', buffer: ['b', 'c']
const first = await ch . receive () // 'b' (not 'a'!)
drop-latest
Drop the new item when buffer is full:
const ch = s . channel < string >({
capacity: 2 ,
backpressure: 'drop-latest' ,
onDrop : ( val ) => console . log ( 'Dropped:' , val )
})
await ch . send ( 'a' ) // buffer: ['a']
await ch . send ( 'b' ) // buffer: ['a', 'b']
await ch . send ( 'c' ) // Drops 'c', buffer: ['a', 'b']
const first = await ch . receive () // 'a'
const second = await ch . receive () // 'b'
error
Throw error when buffer is full:
const ch = s . channel < string >({
capacity: 2 ,
backpressure: 'error'
})
await ch . send ( 'a' ) // OK
await ch . send ( 'b' ) // OK
try {
await ch . send ( 'c' ) // Throws ChannelFullError
} catch ( err ) {
console . error ( 'Buffer full:' , err )
}
sample
Keep only values within a time window (most recent):
const ch = s . channel < Measurement >({
capacity: 1000 ,
backpressure: 'sample' ,
sampleWindow: 5000 , // 5 second window
onDrop : ( val ) => console . log ( 'Dropped old value:' , val )
})
// Send 1000 measurements over 10 seconds
for ( let i = 0 ; i < 1000 ; i ++ ) {
await ch . send ({ timestamp: Date . now (), value: i })
await sleep ( 10 )
}
// Only measurements from last 5 seconds are available
for await ( const measurement of ch ) {
// These are all recent (< 5 seconds old)
}
Core Methods
send()
Send a value to the channel. Behavior depends on backpressure strategy.
const ch = s . channel < string >( 10 )
const success = await ch . send ( 'hello' )
if ( ! success ) {
console . log ( 'Channel is closed' )
}
True if sent successfully, false if channel is closed
Returns false if channel is closed. Throws if scope is aborted. Behavior when buffer is full depends on backpressure strategy.
receive()
Receive a value from the channel. Blocks if channel is empty.
const ch = s . channel < string >( 10 )
await ch . send ( 'hello' )
const value = await ch . receive ()
console . log ( value ) // 'hello'
// If channel is closed and empty
ch . close ()
const value2 = await ch . receive ()
console . log ( value2 ) // undefined
Received value, or undefined if channel is closed and empty
Returns undefined if channel is closed and buffer is empty. Throws if scope is aborted.
close()
Close the channel. No more sends allowed. Consumers can still drain the buffer.
const ch = s . channel < string >( 10 )
await ch . send ( 'a' )
await ch . send ( 'b' )
ch . close ()
// Sending after close returns false
const success = await ch . send ( 'c' )
console . log ( success ) // false
// Receiving still works until buffer is drained
const a = await ch . receive () // 'a'
const b = await ch . receive () // 'b'
const c = await ch . receive () // undefined (closed and empty)
map()
Transform each value using a mapping function. Returns a new channel.
const numbers = s . channel < number >( 10 )
const doubled = numbers . map ( x => x * 2 )
await numbers . send ( 5 )
await numbers . send ( 10 )
numbers . close ()
const a = await doubled . receive () // 10
const b = await doubled . receive () // 20
New channel with mapped values
filter()
Filter values based on a predicate. Returns a new channel.
const numbers = s . channel < number >( 10 )
const evens = numbers . filter ( x => x % 2 === 0 )
await numbers . send ( 1 )
await numbers . send ( 2 )
await numbers . send ( 3 )
await numbers . send ( 4 )
numbers . close ()
const a = await evens . receive () // 2
const b = await evens . receive () // 4
predicate
(value: T) => boolean
required
Filter function
New channel with filtered values
reduce()
Reduce all values to a single value. Returns a promise.
const numbers = s . channel < number >( 10 )
const sumPromise = numbers . reduce (( acc , x ) => acc + x , 0 )
await numbers . send ( 1 )
await numbers . send ( 2 )
await numbers . send ( 3 )
numbers . close ()
const sum = await sumPromise // 6
fn
(accumulator: R, value: T) => R
required
Reducer function
Initial accumulator value
take()
Take only the first n values. Returns a new channel that closes after n values.
const numbers = s . channel < number >()
const first3 = numbers . take ( 3 )
await numbers . send ( 1 )
await numbers . send ( 2 )
await numbers . send ( 3 )
await numbers . send ( 4 ) // Ignored by first3
for await ( const n of first3 ) {
console . log ( n ) // 1, 2, 3
}
New channel limited to n values
Async Iteration
Channels implement AsyncIterable, allowing use with for await...of:
const ch = s . channel < string >( 10 )
// Producer task
s . task ( async () => {
for ( const item of [ 'a' , 'b' , 'c' ]) {
await ch . send ( item )
}
ch . close ()
})
// Consumer task
s . task ( async () => {
for await ( const item of ch ) {
console . log ( 'Received:' , item )
}
console . log ( 'Channel closed' )
})
Properties
isClosed
Whether the channel is closed.
const ch = s . channel < string >()
console . log ( ch . isClosed ) // false
ch . close ()
console . log ( ch . isClosed ) // true
True if channel is closed
size
Current number of items in the buffer.
const ch = s . channel < string >( 10 )
await ch . send ( 'a' )
await ch . send ( 'b' )
console . log ( ch . size ) // 2
await ch . receive ()
console . log ( ch . size ) // 1
cap
Buffer capacity.
const ch = s . channel < string >( 10 )
console . log ( ch . cap ) // 10
strategy
Current backpressure strategy.
const ch = s . channel < string >({
capacity: 10 ,
backpressure: 'drop-oldest'
})
console . log ( ch . strategy ) // 'drop-oldest'
Patterns
Producer-Consumer
await using s = scope ()
const ch = s . channel < number >( 10 )
// Producer
s . task ( async () => {
for ( let i = 0 ; i < 100 ; i ++ ) {
await ch . send ( i )
}
ch . close ()
})
// Consumer
s . task ( async () => {
for await ( const num of ch ) {
await processNumber ( num )
}
})
Multiple Producers
await using s = scope ()
const ch = s . channel < Message >( 100 )
// Producer 1
s . task ( async () => {
for ( const msg of source1 ) {
await ch . send ( msg )
}
})
// Producer 2
s . task ( async () => {
for ( const msg of source2 ) {
await ch . send ( msg )
}
})
// Single consumer
s . task ( async () => {
for await ( const msg of ch ) {
await handleMessage ( msg )
}
})
Multiple Consumers (Fan-out)
await using s = scope ()
const ch = s . channel < Job >( 100 )
// Producer
s . task ( async () => {
for ( const job of jobs ) {
await ch . send ( job )
}
ch . close ()
})
// Consumer 1
s . task ( async () => {
for await ( const job of ch ) {
await processJob ( job )
}
})
// Consumer 2
s . task ( async () => {
for await ( const job of ch ) {
await processJob ( job )
}
})
// Consumer 3
s . task ( async () => {
for await ( const job of ch ) {
await processJob ( job )
}
})
Pipeline
await using s = scope ()
const input = s . channel < string >( 10 )
const processed = input
. map ( x => x . toUpperCase ())
. filter ( x => x . length > 3 )
. map ( x => ({ text: x , timestamp: Date . now () }))
// Producer
s . task ( async () => {
for ( const item of items ) {
await input . send ( item )
}
input . close ()
})
// Consumer
s . task ( async () => {
for await ( const item of processed ) {
await saveToDatabase ( item )
}
})
Rate-Limited Channel
await using s = scope ()
const ch = s . channel < Request >({
capacity: 100 ,
backpressure: 'drop-latest' ,
onDrop : ( req ) => {
console . warn ( 'Rate limit exceeded, dropped request:' , req . id )
}
})
// Fast producer
s . task ( async () => {
for ( const req of requests ) {
await ch . send ( req ) // May drop if consumer is slow
}
})
// Slow consumer with rate limit
s . task ( async () => {
const rateLimiter = s . semaphore ( 10 ) // Max 10 concurrent
for await ( const req of ch ) {
await rateLimiter . acquire ( async () => {
await processRequest ( req )
})
}
})
Buffered Stream Processing
await using s = scope ()
const ch = s . channel < Event >( 1000 )
// High-throughput event source
s . task ( async () => {
eventSource . on ( 'data' , async ( event ) => {
await ch . send ( event )
})
eventSource . on ( 'end' , () => {
ch . close ()
})
})
// Batch processor
s . task ( async () => {
const batch : Event [] = []
for await ( const event of ch ) {
batch . push ( event )
if ( batch . length >= 100 ) {
await processBatch ( batch )
batch . length = 0
}
}
// Process remaining
if ( batch . length > 0 ) {
await processBatch ( batch )
}
})
Time-Sampled Data
await using s = scope ()
const measurements = s . channel < Measurement >({
capacity: 10000 ,
backpressure: 'sample' ,
sampleWindow: 60000 , // 1 minute window
onDrop : ( m ) => console . log ( 'Dropped old measurement:' , m )
})
// High-frequency sensor
s . task ( async () => {
while ( true ) {
const measurement = await readSensor ()
await measurements . send ( measurement )
await sleep ( 10 ) // 100 readings/second
}
})
// Low-frequency consumer (1 reading/second)
s . task ( async () => {
for await ( const m of measurements ) {
await saveMeasurement ( m )
await sleep ( 1000 )
}
})
BroadcastChannel
For pub/sub patterns where all subscribers receive every message, use BroadcastChannel:
const bc = s . broadcast < string >()
// Unlike Channel, all subscribers get all messages
s . task ( async () => {
for await ( const msg of bc . subscribe ()) {
console . log ( 'Consumer 1:' , msg )
}
})
s . task ( async () => {
for await ( const msg of bc . subscribe ()) {
console . log ( 'Consumer 2:' , msg )
}
})
await bc . send ( 'hello' ) // Both consumers receive "hello"
See BroadcastChannel for details.
Examples
Request Queue
import { scope } from 'go-go-scope'
interface Request {
id : string
data : unknown
}
await using s = scope ()
const requests = s . channel < Request >( 100 )
// API endpoint adds requests to channel
app . post ( '/api/process' , async ( req , res ) => {
const request = { id: generateId (), data: req . body }
const sent = await requests . send ( request )
if ( ! sent ) {
return res . status ( 503 ). send ( 'Service unavailable' )
}
res . json ({ id: request . id })
})
// Background worker processes requests
s . task ( async () => {
for await ( const req of requests ) {
try {
await processRequest ( req )
} catch ( err ) {
console . error ( 'Failed to process request:' , req . id , err )
}
}
})
Data Pipeline
import { scope } from 'go-go-scope'
await using s = scope ()
// Stage 1: Fetch data
const raw = s . channel < RawData >( 100 )
s . task ( async () => {
for ( const id of ids ) {
const data = await fetchData ( id )
await raw . send ( data )
}
raw . close ()
})
// Stage 2: Transform
const transformed = raw . map ( data => transform ( data ))
// Stage 3: Validate
const validated = transformed . filter ( data => validate ( data ))
// Stage 4: Enrich
const enriched = s . channel < EnrichedData >( 100 )
s . task ( async () => {
for await ( const data of validated ) {
const enrichedData = await enrich ( data )
await enriched . send ( enrichedData )
}
enriched . close ()
})
// Stage 5: Save
s . task ( async () => {
for await ( const data of enriched ) {
await saveToDatabase ( data )
}
})