Overview
The Stream API provides lazy, composable operations on async iterables. Streams integrate with scopes for automatic cancellation and resource cleanup.
Streams are lazy - operations only execute when you consume the stream (e.g., with toArray() or for await).
Creating Streams
Create streams from various sources:
import { scope } from 'go-go-scope' ;
import { Stream } from '@go-go-scope/stream' ;
await using s = scope ();
// From async iterable
const stream1 = new Stream ( asyncIterable , s );
// From array
async function* fromArray < T >( arr : T []) : AsyncGenerator < T > {
for ( const item of arr ) yield item ;
}
const stream2 = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 ]), s );
// From channel
const ch = s . channel < number >( 10 );
const stream3 = new Stream ( ch , s );
Map
Transform each value:
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 ]), s );
const doubled = stream
. map ( x => x * 2 )
. map ( x => x + 1 );
const [ err , result ] = await doubled . toArray ();
// result = [3, 5, 7, 9, 11]
Filter
Keep only matching values:
const evens = stream
. filter ( x => x % 2 === 0 );
const [ err , result ] = await evens . toArray ();
// result = [2, 4]
FlatMap
Map and flatten in one operation:
const stream = new Stream ( fromArray ([ 1 , 2 , 3 ]), s );
const flattened = stream
. flatMap ( x => [ x , x * 10 ]);
const [ err , result ] = await flattened . toArray ();
// result = [1, 10, 2, 20, 3, 30]
FilterMap
Combine filter and map:
const stream = new Stream ( fromArray ([ '1' , 'a' , '2' , 'b' , '3' ]), s );
const numbers = stream
. filterMap ( x => {
const n = parseInt ( x , 10 );
return isNaN ( n ) ? undefined : n ;
});
const [ err , result ] = await numbers . toArray ();
// result = [1, 2, 3]
Slicing
Take / Drop
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]), s );
// Take first 3
const first3 = stream . take ( 3 );
const [ err1 , result1 ] = await first3 . toArray ();
// result1 = [1, 2, 3]
// Drop first 3
const afterDrop = stream . drop ( 3 );
const [ err2 , result2 ] = await afterDrop . toArray ();
// result2 = [4, 5, 6, 7, 8, 9, 10]
// Skip (alias for drop)
const afterSkip = stream . skip ( 3 );
TakeWhile / DropWhile
// Take while condition holds
const lessThan5 = stream . takeWhile ( x => x < 5 );
const [ err1 , result1 ] = await lessThan5 . toArray ();
// result1 = [1, 2, 3, 4]
// Drop while condition holds
const afterDrop = stream . dropWhile ( x => x < 5 );
const [ err2 , result2 ] = await afterDrop . toArray ();
// result2 = [5, 6, 7, 8, 9, 10]
TakeUntil / DropUntil
// Take until condition holds (inclusive)
const until5 = stream . takeUntil ( x => x === 5 );
const [ err1 , result1 ] = await until5 . toArray ();
// result1 = [1, 2, 3, 4, 5]
// Drop until condition holds (inclusive)
const afterDrop = stream . dropUntil ( x => x === 5 );
const [ err2 , result2 ] = await afterDrop . toArray ();
// result2 = [5, 6, 7, 8, 9, 10]
Buffering
Buffer by Count
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 , 6 , 7 ]), s );
const chunked = stream . buffer ( 3 );
const [ err , result ] = await chunked . toArray ();
// result = [[1, 2, 3], [4, 5, 6], [7]]
Buffer by Time
const buffered = stream . bufferTime ( 1000 ); // 1 second windows
// Emits chunks every 1 second
Buffer by Time or Count
const buffered = stream . bufferTimeOrCount ( 1000 , 10 );
// Emits when 10 items collected OR 1 second passes
Timing Operations
Delay
Add delay between elements:
const delayed = stream . delay ( 100 ); // 100ms delay between items
Throttle
Limit emission rate:
const throttled = stream . throttle ({
limit: 5 , // Max 5 items
interval: 1000 // per second
});
Debounce
Emit only after silence:
const debounced = stream . debounce ( 300 ); // 300ms quiet period
Sample
Emit latest value at intervals:
const sampled = stream . sample ( 1000 ); // Every 1 second
Timeout
Fail if stream doesn’t complete in time:
const withTimeout = stream . timeout ( 5000 ); // 5 second timeout
Accumulation
Scan
Running fold (emit intermediate results):
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 ]), s );
const running = stream . scan (( acc , x ) => acc + x , 0 );
const [ err , result ] = await running . toArray ();
// result = [1, 3, 6, 10, 15]
Distinct
Remove duplicates:
const stream = new Stream ( fromArray ([ 1 , 2 , 2 , 3 , 1 , 4 ]), s );
const unique = stream . distinct ();
const [ err , result ] = await unique . toArray ();
// result = [1, 2, 3, 4]
DistinctBy
Remove duplicates by key:
const users = [
{ id: 1 , name: 'Alice' },
{ id: 2 , name: 'Bob' },
{ id: 1 , name: 'Alice2' },
];
const stream = new Stream ( fromArray ( users ), s );
const uniqueUsers = stream . distinctBy ( u => u . id );
// Only first occurrence of each id
DistinctAdjacent
Remove consecutive duplicates:
const stream = new Stream ( fromArray ([ 1 , 1 , 2 , 2 , 3 , 1 , 1 ]), s );
const deduped = stream . distinctAdjacent ();
const [ err , result ] = await deduped . toArray ();
// result = [1, 2, 3, 1]
Combining Streams
Concat
const stream1 = new Stream ( fromArray ([ 1 , 2 , 3 ]), s );
const stream2 = new Stream ( fromArray ([ 4 , 5 , 6 ]), s );
const combined = stream1 . concat ( stream2 );
const [ err , result ] = await combined . toArray ();
// result = [1, 2, 3, 4, 5, 6]
Merge
Interleave values from multiple streams:
const merged = stream1 . merge ( stream2 );
// Emits values from both streams as they arrive
Zip
Pair elements from two streams:
const stream1 = new Stream ( fromArray ([ 1 , 2 , 3 ]), s );
const stream2 = new Stream ( fromArray ([ 'a' , 'b' , 'c' ]), s );
const zipped = stream1 . zip ( stream2 );
const [ err , result ] = await zipped . toArray ();
// result = [[1, 'a'], [2, 'b'], [3, 'c']]
ZipWith
Zip with custom combiner:
const combined = stream1 . zipWith ( stream2 , ( a , b ) => ` ${ a } - ${ b } ` );
const [ err , result ] = await combined . toArray ();
// result = ['1-a', '2-b', '3-c']
Error Handling
CatchError
Recover from errors:
const stream = new Stream ( failingSource , s );
const recovered = stream . catchError ( error => {
console . error ( 'Stream error:' , error );
return fromArray ([ 0 ]); // Fallback value
});
OrElse
Provide fallback stream:
const fallback = new Stream ( fromArray ([ 0 ]), s );
const withFallback = stream . orElse ( fallback );
TapError
Side effect on errors:
const logged = stream . tapError ( error => {
console . error ( 'Error:' , error );
});
Ensuring
Cleanup on completion:
const withCleanup = stream . ensuring ( async () => {
console . log ( 'Stream completed' );
await cleanup ();
});
Advanced Operations
SwitchMap
Switch to new stream on each outer value:
const outer = new Stream ( fromArray ([ 1 , 2 , 3 ]), s );
const switched = outer . switchMap ( x =>
new Stream ( fromArray ([ x , x * 10 , x * 100 ]), s )
);
// Cancels previous inner stream when new outer value arrives
GroupBy
Group elements by key:
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 , 6 ]), s );
const { groups , done } = stream . groupByKey ( x => x % 2 === 0 ? 'even' : 'odd' );
const evens = groups . get ( 'even' );
const odds = groups . get ( 'odd' );
await done ; // Wait for grouping to complete
Pairwise
Emit consecutive pairs:
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 ]), s );
const pairs = stream . pairwise ();
const [ err , result ] = await pairs . toArray ();
// result = [[1, 2], [2, 3], [3, 4], [4, 5]]
Window
Sliding window of values:
const stream = new Stream ( fromArray ([ 1 , 2 , 3 , 4 , 5 ]), s );
const windows = stream . window ( 3 );
const [ err , result ] = await windows . toArray ();
// result = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
Terminal Operations
ToArray
Collect all values:
const [ err , values ] = await stream . toArray ();
ForEach
Execute side effect per value:
const [ err ] = await stream . forEach ( value => {
console . log ( value );
});
Reduce
Fold to single value:
const [ err , sum ] = await stream . reduce (( acc , x ) => acc + x , 0 );
First / Last
const [ err1 , first ] = await stream . first ();
const [ err2 , last ] = await stream . last ();
Count
const [ err , count ] = await stream . count ();
Some / Every
const [ err1 , hasEven ] = await stream . some ( x => x % 2 === 0 );
const [ err2 , allPositive ] = await stream . every ( x => x > 0 );
Real-World Examples
Processing Log Files
import { createReadStream } from 'fs' ;
import { createInterface } from 'readline' ;
await using s = scope ();
const fileStream = createReadStream ( 'app.log' );
const rl = createInterface ({ input: fileStream });
const logStream = new Stream ( rl , s );
const errors = await logStream
. filter ( line => line . includes ( 'ERROR' ))
. map ( line => JSON . parse ( line ))
. filter ( log => log . level === 'error' )
. map ( log => ({ timestamp: log . timestamp , message: log . message }))
. take ( 100 )
. toArray ();
async function* fetchPages ( baseUrl : string ) {
let page = 1 ;
while ( true ) {
const res = await fetch ( ` ${ baseUrl } ?page= ${ page } ` );
const data = await res . json ();
if ( data . items . length === 0 ) break ;
yield * data . items ;
page ++ ;
}
}
await using s = scope ();
const items = new Stream ( fetchPages ( '/api/items' ), s );
const processed = await items
. filter ( item => item . active )
. map ( item => transformItem ( item ))
. buffer ( 100 ) // Process in batches of 100
. forEach ( batch => processBatch ( batch ));
Real-Time Event Processing
await using s = scope ();
const events = s . channel < Event >( 1000 );
const eventStream = new Stream ( events , s );
const processed = eventStream
. filter ( e => e . type === 'user.action' )
. debounce ( 100 ) // Debounce rapid events
. buffer ( 10 ) // Batch for efficiency
. forEach ( batch => {
console . log ( `Processing ${ batch . length } events` );
return processBatch ( batch );
});
Lazy Evaluation Streams are lazy - operations compose without executing until consumed
Buffer Wisely Use buffering to batch operations and reduce overhead
Early Filtering Filter early in the pipeline to reduce processing
Limit Memory Use take() or takeWhile() to avoid unbounded memory usage
Next Steps
Scheduler Distributed job scheduling with cron support
Channels Go-style channels for concurrent communication