Skip to main content

@go-go-scope/stream

Lazy async iterable streams for go-go-scope with 50+ composable operations. Process async data with functional, lazy evaluation that integrates seamlessly with structured concurrency.

Installation

npm install @go-go-scope/stream

Quick Start

import { scope } from 'go-go-scope'
import { Stream } from '@go-go-scope/stream'

await using s = scope()

const [err, results] = await s.stream(fetchData())
  .map(x => x * 2)
  .filter(x => x > 10)
  .take(5)
  .toArray()

Core Concepts

Lazy Evaluation

All stream operations are lazy - they don’t execute until a terminal operation (like toArray(), forEach(), or drain()) is called. This allows for efficient composition without materializing intermediate results.

Automatic Cancellation

Streams integrate with go-go-scope’s structured concurrency. When the parent scope is disposed, all stream operations are automatically cancelled.

Result Tuples

Terminal operations return Result<Error, T> tuples for type-safe error handling:
const [err, results] = await stream.toArray()
if (err) {
  console.error('Stream failed:', err)
} else {
  console.log('Results:', results)
}

Stream Class

Constructor

source
AsyncIterable<T>
required
The async iterable data source
scope
Scope
required
The parent scope for cancellation propagation

Transformation Operations

map()

Transform each value using a mapping function.
stream.map((value, index) => value * 2)
fn
(value: T, index: number) => R
required
Transformation function applied to each element
Stream<R>
Stream<R>
New stream with transformed values

filter()

Filter values based on a predicate.
stream.filter((value, index) => value > 10)
predicate
(value: T, index: number) => boolean
required
Function that returns true to keep the element

flatMap()

Map each value to an iterable and flatten the results.
stream.flatMap((value, index) => [value, value * 2])
fn
(value: T, index: number) => Iterable<R> | AsyncIterable<R>
required
Function that returns an iterable for each element

flat()

Flatten a stream of iterables by one level.
stream.flat() // Stream<Iterable<R>> => Stream<R>

tap()

Perform side effects without modifying values.
stream.tap(value => console.log('Processing:', value))
fn
(value: T) => void | Promise<void>
required
Side effect function called for each element

filterMap()

Map and filter in one operation. Returns undefined/null to filter out values.
stream.filterMap(x => {
  const n = parseInt(x, 10)
  return isNaN(n) ? undefined : n
})

Error Handling

catchAll()

Catch errors and recover with a fallback stream.
stream.catchAll(error => {
  console.error('Stream error:', error)
  return (async function*() { yield 'fallback' })()
})
handler
(error: unknown) => AsyncIterable<R>
required
Handler that returns a fallback stream

orElse()

Provide fallback stream on error or empty source.
stream.orElse(fallbackStream)

orElseSucceed()

Provide a single fallback value on failure.
stream.orElseSucceed('default')

tapError()

Perform side effects on errors without modifying the error.
stream.tapError(err => console.error('Error occurred:', err))

mapError()

Transform errors using a mapping function.
stream.mapError(err => new CustomError(err.message))

ensuring()

Run cleanup effect when stream completes (success or error).
stream.ensuring(() => console.log('Stream completed'))

Slicing Operations

take()

Take first n elements.
stream.take(5)
n
number
required
Number of elements to take

takeWhile()

Take elements while predicate holds.
stream.takeWhile(x => x < 100)

takeUntil()

Take elements until predicate succeeds (inclusive).
stream.takeUntil(x => x === 'END')

drop()

Skip first n elements.
stream.drop(10)

dropWhile()

Skip elements while predicate holds.
stream.dropWhile(x => x < 10)

dropUntil()

Skip elements until predicate succeeds.
stream.dropUntil(x => x > 0)

skip()

Alias for drop().

Accumulating Operations

scan()

Running fold - emit intermediate accumulator results.
stream.scan((acc, value) => acc + value, 0)
// [1, 2, 3] => [1, 3, 6]
fn
(acc: R, value: T) => R
required
Accumulator function
initial
R
required
Initial accumulator value

buffer()

Buffer values into fixed-size chunks.
stream.buffer(3)
// [1, 2, 3, 4, 5] => [[1, 2, 3], [4, 5]]
size
number
required
Chunk size

bufferTime()

Buffer values within time windows.
stream.bufferTime(1000) // 1 second windows
windowMs
number
required
Time window in milliseconds

bufferTimeOrCount()

Buffer by size OR time window, whichever comes first.
stream.bufferTimeOrCount(1000, 10)
// Emit when 10 items OR 1 second elapsed

distinct()

Emit only unique values (using ===).
stream.distinct()
// [1, 2, 2, 3, 1] => [1, 2, 3]

distinctBy()

Emit only unique values based on key function.
stream.distinctBy(x => x.id)

distinctAdjacent()

Emit only values different from previous one.
stream.distinctAdjacent()
// [1, 1, 2, 2, 1] => [1, 2, 1]

distinctAdjacentBy()

Emit only values different from previous based on key.
stream.distinctAdjacentBy(x => x.type)

prepend()

Prepend values to the stream.
stream.prepend('a', 'b')

append()

Append values to the stream.
stream.append('x', 'y', 'z')

concat()

Concatenate another stream after this one.
stream.concat(otherStream)

intersperse()

Insert separator between values.
stream.intersperse(',')
// [1, 2, 3] => [1, ',', 2, ',', 3]

groupAdjacentBy()

Group consecutive elements by key function.
stream.groupAdjacentBy(x => x.category)

Timing Operations

delay()

Add delay between elements.
stream.delay(100) // 100ms between each element
ms
number
required
Delay in milliseconds

throttle()

Limit rate of emissions.
stream.throttle({ limit: 10, interval: 1000 })
// Max 10 items per second
options.limit
number
required
Maximum items per interval
options.interval
number
required
Time interval in milliseconds

debounce()

Emit only after silence period.
stream.debounce(300)
// Emits value only after 300ms of no new values
ms
number
required
Silence period in milliseconds

timeout()

Fail if stream doesn’t complete within duration.
stream.timeout(5000) // 5 second timeout
durationMs
number
required
Timeout duration in milliseconds

sample()

Sample the stream at regular intervals.
stream.sample(1000) // Emit latest value every second

auditTime()

Emit latest value after duration silence.
stream.auditTime(500)

Combining Operations

merge()

Merge with another stream (interleave values).
stream.merge(otherStream)

zip()

Zip with another stream - pair elements.
stream.zip(otherStream)
// [1, 2, 3] + ['a', 'b', 'c'] => [[1, 'a'], [2, 'b'], [3, 'c']]

zipWithIndex()

Zip with index.
stream.zipWithIndex()
// ['a', 'b', 'c'] => [['a', 0], ['b', 1], ['c', 2]]

zipWith()

Zip with another stream using combining function.
stream.zipWith(otherStream, (a, b) => a + b)

zipLatest()

Zip using latest value from each stream.
stream.zipLatest(otherStream)

zipAll()

Zip with padding for unequal length streams.
stream.zipAll(otherStream, 'defaultA', 'defaultB')

interleave()

Interleave values from multiple streams.
stream.interleave(stream2, stream3)

cross()

Cartesian product of two streams.
stream.cross(otherStream)
// [1, 2] × ['a', 'b'] => [[1, 'a'], [1, 'b'], [2, 'a'], [2, 'b']]

collect()

Collect values using a partial function.
stream.collect(x => {
  const n = parseInt(x)
  return isNaN(n) ? undefined : n
})

collectWhile()

Collect while partial function returns defined values.
stream.collectWhile(x => x < 100 ? x : undefined)

grouped()

Group elements into fixed-size chunks (alias for buffer).
stream.grouped(5)

groupedWithin()

Group by size and/or time window.
stream.groupedWithin(10, 1000)
// Group max 10 items or 1 second

groupByKey()

Group elements by key into separate streams.
const { groups, done } = stream.groupByKey(x => x.category)
const evenStream = groups.get('even')
const oddStream = groups.get('odd')
groups
Map<K, Stream<T>>
Map of group keys to streams
done
Promise<void>
Promise that resolves when distribution is complete

Advanced Operations

switchMap()

Cancel previous inner stream when new outer value arrives.
stream.switchMap(value => fetchPages(value))

concatMap()

Sequential flatMap - wait for each inner iterable to complete.
stream.concatMap(url => fetchPages(url))

exhaustMap()

Ignore new emissions while processing.
stream.exhaustMap(query => fetchResults(query))

pairwise()

Emit [previous, current] tuples.
stream.pairwise()
// [1, 2, 3, 4] => [[1, 2], [2, 3], [3, 4]]

window()

Sliding window of elements.
stream.window(3)
// [1, 2, 3, 4, 5] => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
size
number
required
Window size

share()

Multicast to multiple subscribers.
const shared = stream.share({ bufferSize: 1 })
shared.forEach(v => console.log('A:', v))
shared.forEach(v => console.log('B:', v))

Splitting Operations

partition()

Partition stream into two based on predicate.
const [evens, odds] = stream.partition(n => n % 2 === 0)
predicate
(value: T) => boolean
required
Predicate to split values
options.bufferSize
number
Buffer size for each partition (default: 16)
[Stream<T>, Stream<T>]
[Stream<T>, Stream<T>]
Tuple of [pass, fail] streams

splitAt()

Split stream at position n.
const [head, tail] = stream.splitAt(5)

broadcast()

Broadcast stream to N consumers.
const [s1, s2, s3] = stream.broadcast(3)
n
number
required
Number of output streams
options.bufferSize
number
Buffer size per stream (default: 0)

Terminal Operations

toArray()

Collect all values into an array.
const [err, results] = await stream.toArray()
Result<Error, T[]>
Result<Error, T[]>
Result tuple containing error or array of values

forEach()

Execute effect for each value.
const [err] = await stream.forEach(value => {
  console.log('Value:', value)
})
fn
(value: T) => void | Promise<void>
required
Effect function for each element

drain()

Consume stream without collecting values.
const [err] = await stream.drain()

fold()

Fold stream into single value.
const [err, sum] = await stream.fold(0, (acc, val) => acc + val)
initial
R
required
Initial accumulator value
fn
(acc: R, value: T) => R
required
Accumulator function

reduce()

Reduce stream to single value (no initial value).
const [err, result] = await stream.reduce((acc, val) => acc + val)

count()

Count elements in stream.
const [err, count] = await stream.count()

find()

Find first element matching predicate.
const [err, found] = await stream.find(x => x > 10)

first()

Get first element.
const [err, first] = await stream.first()

last()

Get last element.
const [err, last] = await stream.last()

elementAt()

Get element at specific index.
const [err, element] = await stream.elementAt(5)

some()

Check if any element satisfies predicate.
const [err, hasAny] = await stream.some(x => x > 100)

every()

Check if all elements satisfy predicate.
const [err, allMatch] = await stream.every(x => x > 0)

includes()

Check if stream includes a value.
const [err, hasValue] = await stream.includes(42)

groupBy()

Group elements by key into Map.
const [err, groups] = await stream.groupBy(x => x.category)

sum()

Sum all numeric values.
const [err, total] = await stream.sum()

avg()

Calculate average of numeric values.
const [err, average] = await stream.avg()

max()

Get maximum value.
const [err, maximum] = await stream.max()

min()

Get minimum value.
const [err, minimum] = await stream.min()

Utility Operations

pipe()

Pipe stream through transformation functions.
const result = await stream
  .pipe(
    s => s.filter(x => x > 0),
    s => s.map(x => x * 2),
    s => s.take(10)
  )
  .toArray()

retry()

Retry stream on failure.
stream.retry({ maxRetries: 3, delay: 1000 })
options.maxRetries
number
Maximum retry attempts (default: 3)
options.delay
number
Delay between retries in milliseconds (default: 0)

Stream Plugin

Add stream support directly to scope:
import { scope } from 'go-go-scope'
import { streamPlugin } from '@go-go-scope/stream'

const s = scope({ plugins: [streamPlugin] })
const stream = s.stream([1, 2, 3])

Examples

Basic Data Processing

await using s = scope()

const [err, results] = await s.stream(users)
  .filter(user => user.active)
  .map(user => user.email)
  .distinct()
  .toArray()

Rate-Limited API Calls

await using s = scope()

const [err] = await s.stream(urls)
  .throttle({ limit: 5, interval: 1000 }) // 5 requests/sec
  .flatMap(url => fetch(url).then(r => r.json()))
  .forEach(data => processData(data))

Error Recovery

await using s = scope()

const [err, data] = await s.stream(fetchData())
  .retry({ maxRetries: 3, delay: 1000 })
  .catchAll(error => {
    console.error('Failed:', error)
    return [{ fallback: true }]
  })
  .toArray()

Windowed Processing

await using s = scope()

const [err] = await s.stream(sensorData)
  .window(5)
  .map(window => {
    const avg = window.reduce((a, b) => a + b, 0) / window.length
    return { average: avg, samples: window.length }
  })
  .forEach(stats => console.log(stats))

Parallel Processing with Grouping

await using s = scope()

const { groups, done } = s.stream(tasks).groupByKey(t => t.priority)

// Process each priority in parallel
await Promise.all([
  groups.get('high').forEach(t => processHighPriority(t)),
  groups.get('medium').forEach(t => processMediumPriority(t)),
  groups.get('low').forEach(t => processLowPriority(t))
])

await done

Build docs developers (and LLMs) love