@go-go-scope/stream
Lazy async iterable streams for go-go-scope with 50+ composable operations. Process async data with functional, lazy evaluation that integrates seamlessly with structured concurrency.
Installation
npm install @go-go-scope/stream
Quick Start
import { scope } from 'go-go-scope'
import { Stream } from '@go-go-scope/stream'
await using s = scope()
const [err, results] = await s.stream(fetchData())
.map(x => x * 2)
.filter(x => x > 10)
.take(5)
.toArray()
Core Concepts
Lazy Evaluation
All stream operations are lazy - they don’t execute until a terminal operation (like toArray(), forEach(), or drain()) is called. This allows for efficient composition without materializing intermediate results.
Automatic Cancellation
Streams integrate with go-go-scope’s structured concurrency. When the parent scope is disposed, all stream operations are automatically cancelled.
Result Tuples
Terminal operations return Result<Error, T> tuples for type-safe error handling:
const [err, results] = await stream.toArray()
if (err) {
console.error('Stream failed:', err)
} else {
console.log('Results:', results)
}
Stream Class
Constructor
The async iterable data source
The parent scope for cancellation propagation
map()
Transform each value using a mapping function.
stream.map((value, index) => value * 2)
fn
(value: T, index: number) => R
required
Transformation function applied to each element
New stream with transformed values
filter()
Filter values based on a predicate.
stream.filter((value, index) => value > 10)
predicate
(value: T, index: number) => boolean
required
Function that returns true to keep the element
flatMap()
Map each value to an iterable and flatten the results.
stream.flatMap((value, index) => [value, value * 2])
fn
(value: T, index: number) => Iterable<R> | AsyncIterable<R>
required
Function that returns an iterable for each element
flat()
Flatten a stream of iterables by one level.
stream.flat() // Stream<Iterable<R>> => Stream<R>
tap()
Perform side effects without modifying values.
stream.tap(value => console.log('Processing:', value))
fn
(value: T) => void | Promise<void>
required
Side effect function called for each element
filterMap()
Map and filter in one operation. Returns undefined/null to filter out values.
stream.filterMap(x => {
const n = parseInt(x, 10)
return isNaN(n) ? undefined : n
})
Error Handling
catchAll()
Catch errors and recover with a fallback stream.
stream.catchAll(error => {
console.error('Stream error:', error)
return (async function*() { yield 'fallback' })()
})
handler
(error: unknown) => AsyncIterable<R>
required
Handler that returns a fallback stream
orElse()
Provide fallback stream on error or empty source.
stream.orElse(fallbackStream)
orElseSucceed()
Provide a single fallback value on failure.
stream.orElseSucceed('default')
tapError()
Perform side effects on errors without modifying the error.
stream.tapError(err => console.error('Error occurred:', err))
mapError()
Transform errors using a mapping function.
stream.mapError(err => new CustomError(err.message))
ensuring()
Run cleanup effect when stream completes (success or error).
stream.ensuring(() => console.log('Stream completed'))
Slicing Operations
take()
Take first n elements.
Number of elements to take
takeWhile()
Take elements while predicate holds.
stream.takeWhile(x => x < 100)
takeUntil()
Take elements until predicate succeeds (inclusive).
stream.takeUntil(x => x === 'END')
drop()
Skip first n elements.
dropWhile()
Skip elements while predicate holds.
stream.dropWhile(x => x < 10)
dropUntil()
Skip elements until predicate succeeds.
stream.dropUntil(x => x > 0)
skip()
Alias for drop().
Accumulating Operations
scan()
Running fold - emit intermediate accumulator results.
stream.scan((acc, value) => acc + value, 0)
// [1, 2, 3] => [1, 3, 6]
fn
(acc: R, value: T) => R
required
Accumulator function
Initial accumulator value
buffer()
Buffer values into fixed-size chunks.
stream.buffer(3)
// [1, 2, 3, 4, 5] => [[1, 2, 3], [4, 5]]
bufferTime()
Buffer values within time windows.
stream.bufferTime(1000) // 1 second windows
Time window in milliseconds
bufferTimeOrCount()
Buffer by size OR time window, whichever comes first.
stream.bufferTimeOrCount(1000, 10)
// Emit when 10 items OR 1 second elapsed
distinct()
Emit only unique values (using ===).
stream.distinct()
// [1, 2, 2, 3, 1] => [1, 2, 3]
distinctBy()
Emit only unique values based on key function.
stream.distinctBy(x => x.id)
distinctAdjacent()
Emit only values different from previous one.
stream.distinctAdjacent()
// [1, 1, 2, 2, 1] => [1, 2, 1]
distinctAdjacentBy()
Emit only values different from previous based on key.
stream.distinctAdjacentBy(x => x.type)
prepend()
Prepend values to the stream.
append()
Append values to the stream.
stream.append('x', 'y', 'z')
concat()
Concatenate another stream after this one.
stream.concat(otherStream)
intersperse()
Insert separator between values.
stream.intersperse(',')
// [1, 2, 3] => [1, ',', 2, ',', 3]
groupAdjacentBy()
Group consecutive elements by key function.
stream.groupAdjacentBy(x => x.category)
Timing Operations
delay()
Add delay between elements.
stream.delay(100) // 100ms between each element
throttle()
Limit rate of emissions.
stream.throttle({ limit: 10, interval: 1000 })
// Max 10 items per second
Maximum items per interval
Time interval in milliseconds
debounce()
Emit only after silence period.
stream.debounce(300)
// Emits value only after 300ms of no new values
Silence period in milliseconds
timeout()
Fail if stream doesn’t complete within duration.
stream.timeout(5000) // 5 second timeout
Timeout duration in milliseconds
sample()
Sample the stream at regular intervals.
stream.sample(1000) // Emit latest value every second
auditTime()
Emit latest value after duration silence.
Combining Operations
merge()
Merge with another stream (interleave values).
stream.merge(otherStream)
zip()
Zip with another stream - pair elements.
stream.zip(otherStream)
// [1, 2, 3] + ['a', 'b', 'c'] => [[1, 'a'], [2, 'b'], [3, 'c']]
zipWithIndex()
Zip with index.
stream.zipWithIndex()
// ['a', 'b', 'c'] => [['a', 0], ['b', 1], ['c', 2]]
zipWith()
Zip with another stream using combining function.
stream.zipWith(otherStream, (a, b) => a + b)
zipLatest()
Zip using latest value from each stream.
stream.zipLatest(otherStream)
zipAll()
Zip with padding for unequal length streams.
stream.zipAll(otherStream, 'defaultA', 'defaultB')
interleave()
Interleave values from multiple streams.
stream.interleave(stream2, stream3)
cross()
Cartesian product of two streams.
stream.cross(otherStream)
// [1, 2] × ['a', 'b'] => [[1, 'a'], [1, 'b'], [2, 'a'], [2, 'b']]
collect()
Collect values using a partial function.
stream.collect(x => {
const n = parseInt(x)
return isNaN(n) ? undefined : n
})
collectWhile()
Collect while partial function returns defined values.
stream.collectWhile(x => x < 100 ? x : undefined)
grouped()
Group elements into fixed-size chunks (alias for buffer).
groupedWithin()
Group by size and/or time window.
stream.groupedWithin(10, 1000)
// Group max 10 items or 1 second
groupByKey()
Group elements by key into separate streams.
const { groups, done } = stream.groupByKey(x => x.category)
const evenStream = groups.get('even')
const oddStream = groups.get('odd')
Map of group keys to streams
Promise that resolves when distribution is complete
Advanced Operations
switchMap()
Cancel previous inner stream when new outer value arrives.
stream.switchMap(value => fetchPages(value))
concatMap()
Sequential flatMap - wait for each inner iterable to complete.
stream.concatMap(url => fetchPages(url))
exhaustMap()
Ignore new emissions while processing.
stream.exhaustMap(query => fetchResults(query))
pairwise()
Emit [previous, current] tuples.
stream.pairwise()
// [1, 2, 3, 4] => [[1, 2], [2, 3], [3, 4]]
window()
Sliding window of elements.
stream.window(3)
// [1, 2, 3, 4, 5] => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
share()
Multicast to multiple subscribers.
const shared = stream.share({ bufferSize: 1 })
shared.forEach(v => console.log('A:', v))
shared.forEach(v => console.log('B:', v))
Splitting Operations
partition()
Partition stream into two based on predicate.
const [evens, odds] = stream.partition(n => n % 2 === 0)
predicate
(value: T) => boolean
required
Predicate to split values
Buffer size for each partition (default: 16)
Tuple of [pass, fail] streams
splitAt()
Split stream at position n.
const [head, tail] = stream.splitAt(5)
broadcast()
Broadcast stream to N consumers.
const [s1, s2, s3] = stream.broadcast(3)
Buffer size per stream (default: 0)
Terminal Operations
toArray()
Collect all values into an array.
const [err, results] = await stream.toArray()
Result tuple containing error or array of values
forEach()
Execute effect for each value.
const [err] = await stream.forEach(value => {
console.log('Value:', value)
})
fn
(value: T) => void | Promise<void>
required
Effect function for each element
drain()
Consume stream without collecting values.
const [err] = await stream.drain()
fold()
Fold stream into single value.
const [err, sum] = await stream.fold(0, (acc, val) => acc + val)
Initial accumulator value
fn
(acc: R, value: T) => R
required
Accumulator function
reduce()
Reduce stream to single value (no initial value).
const [err, result] = await stream.reduce((acc, val) => acc + val)
count()
Count elements in stream.
const [err, count] = await stream.count()
find()
Find first element matching predicate.
const [err, found] = await stream.find(x => x > 10)
first()
Get first element.
const [err, first] = await stream.first()
last()
Get last element.
const [err, last] = await stream.last()
elementAt()
Get element at specific index.
const [err, element] = await stream.elementAt(5)
some()
Check if any element satisfies predicate.
const [err, hasAny] = await stream.some(x => x > 100)
every()
Check if all elements satisfy predicate.
const [err, allMatch] = await stream.every(x => x > 0)
includes()
Check if stream includes a value.
const [err, hasValue] = await stream.includes(42)
groupBy()
Group elements by key into Map.
const [err, groups] = await stream.groupBy(x => x.category)
sum()
Sum all numeric values.
const [err, total] = await stream.sum()
avg()
Calculate average of numeric values.
const [err, average] = await stream.avg()
max()
Get maximum value.
const [err, maximum] = await stream.max()
min()
Get minimum value.
const [err, minimum] = await stream.min()
Utility Operations
pipe()
Pipe stream through transformation functions.
const result = await stream
.pipe(
s => s.filter(x => x > 0),
s => s.map(x => x * 2),
s => s.take(10)
)
.toArray()
retry()
Retry stream on failure.
stream.retry({ maxRetries: 3, delay: 1000 })
Maximum retry attempts (default: 3)
Delay between retries in milliseconds (default: 0)
Stream Plugin
Add stream support directly to scope:
import { scope } from 'go-go-scope'
import { streamPlugin } from '@go-go-scope/stream'
const s = scope({ plugins: [streamPlugin] })
const stream = s.stream([1, 2, 3])
Examples
Basic Data Processing
await using s = scope()
const [err, results] = await s.stream(users)
.filter(user => user.active)
.map(user => user.email)
.distinct()
.toArray()
Rate-Limited API Calls
await using s = scope()
const [err] = await s.stream(urls)
.throttle({ limit: 5, interval: 1000 }) // 5 requests/sec
.flatMap(url => fetch(url).then(r => r.json()))
.forEach(data => processData(data))
Error Recovery
await using s = scope()
const [err, data] = await s.stream(fetchData())
.retry({ maxRetries: 3, delay: 1000 })
.catchAll(error => {
console.error('Failed:', error)
return [{ fallback: true }]
})
.toArray()
Windowed Processing
await using s = scope()
const [err] = await s.stream(sensorData)
.window(5)
.map(window => {
const avg = window.reduce((a, b) => a + b, 0) / window.length
return { average: avg, samples: window.length }
})
.forEach(stats => console.log(stats))
Parallel Processing with Grouping
await using s = scope()
const { groups, done } = s.stream(tasks).groupByKey(t => t.priority)
// Process each priority in parallel
await Promise.all([
groups.get('high').forEach(t => processHighPriority(t)),
groups.get('medium').forEach(t => processMediumPriority(t)),
groups.get('low').forEach(t => processLowPriority(t))
])
await done