Skip to main content

Overview

The Stream API provides lazy, composable operations on async iterables. Streams integrate with scopes for automatic cancellation and resource cleanup.
Streams are lazy - operations only execute when you consume the stream (e.g., with toArray() or for await).

Creating Streams

Create streams from various sources:
import { scope } from 'go-go-scope';
import { Stream } from '@go-go-scope/stream';

await using s = scope();

// From async iterable
const stream1 = new Stream(asyncIterable, s);

// From array
async function* fromArray<T>(arr: T[]): AsyncGenerator<T> {
  for (const item of arr) yield item;
}
const stream2 = new Stream(fromArray([1, 2, 3, 4, 5]), s);

// From channel
const ch = s.channel<number>(10);
const stream3 = new Stream(ch, s);

Transformations

Map

Transform each value:
const stream = new Stream(fromArray([1, 2, 3, 4, 5]), s);

const doubled = stream
  .map(x => x * 2)
  .map(x => x + 1);

const [err, result] = await doubled.toArray();
// result = [3, 5, 7, 9, 11]

Filter

Keep only matching values:
const evens = stream
  .filter(x => x % 2 === 0);

const [err, result] = await evens.toArray();
// result = [2, 4]

FlatMap

Map and flatten in one operation:
const stream = new Stream(fromArray([1, 2, 3]), s);

const flattened = stream
  .flatMap(x => [x, x * 10]);

const [err, result] = await flattened.toArray();
// result = [1, 10, 2, 20, 3, 30]

FilterMap

Combine filter and map:
const stream = new Stream(fromArray(['1', 'a', '2', 'b', '3']), s);

const numbers = stream
  .filterMap(x => {
    const n = parseInt(x, 10);
    return isNaN(n) ? undefined : n;
  });

const [err, result] = await numbers.toArray();
// result = [1, 2, 3]

Slicing

Take / Drop

const stream = new Stream(fromArray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), s);

// Take first 3
const first3 = stream.take(3);
const [err1, result1] = await first3.toArray();
// result1 = [1, 2, 3]

// Drop first 3
const afterDrop = stream.drop(3);
const [err2, result2] = await afterDrop.toArray();
// result2 = [4, 5, 6, 7, 8, 9, 10]

// Skip (alias for drop)
const afterSkip = stream.skip(3);

TakeWhile / DropWhile

// Take while condition holds
const lessThan5 = stream.takeWhile(x => x < 5);
const [err1, result1] = await lessThan5.toArray();
// result1 = [1, 2, 3, 4]

// Drop while condition holds
const afterDrop = stream.dropWhile(x => x < 5);
const [err2, result2] = await afterDrop.toArray();
// result2 = [5, 6, 7, 8, 9, 10]

TakeUntil / DropUntil

// Take until condition holds (inclusive)
const until5 = stream.takeUntil(x => x === 5);
const [err1, result1] = await until5.toArray();
// result1 = [1, 2, 3, 4, 5]

// Drop until condition holds (inclusive)
const afterDrop = stream.dropUntil(x => x === 5);
const [err2, result2] = await afterDrop.toArray();
// result2 = [5, 6, 7, 8, 9, 10]

Buffering

Buffer by Count

const stream = new Stream(fromArray([1, 2, 3, 4, 5, 6, 7]), s);

const chunked = stream.buffer(3);
const [err, result] = await chunked.toArray();
// result = [[1, 2, 3], [4, 5, 6], [7]]

Buffer by Time

const buffered = stream.bufferTime(1000); // 1 second windows
// Emits chunks every 1 second

Buffer by Time or Count

const buffered = stream.bufferTimeOrCount(1000, 10);
// Emits when 10 items collected OR 1 second passes

Timing Operations

Delay

Add delay between elements:
const delayed = stream.delay(100); // 100ms delay between items

Throttle

Limit emission rate:
const throttled = stream.throttle({
  limit: 5,      // Max 5 items
  interval: 1000 // per second
});

Debounce

Emit only after silence:
const debounced = stream.debounce(300); // 300ms quiet period

Sample

Emit latest value at intervals:
const sampled = stream.sample(1000); // Every 1 second

Timeout

Fail if stream doesn’t complete in time:
const withTimeout = stream.timeout(5000); // 5 second timeout

Accumulation

Scan

Running fold (emit intermediate results):
const stream = new Stream(fromArray([1, 2, 3, 4, 5]), s);

const running = stream.scan((acc, x) => acc + x, 0);
const [err, result] = await running.toArray();
// result = [1, 3, 6, 10, 15]

Distinct

Remove duplicates:
const stream = new Stream(fromArray([1, 2, 2, 3, 1, 4]), s);

const unique = stream.distinct();
const [err, result] = await unique.toArray();
// result = [1, 2, 3, 4]

DistinctBy

Remove duplicates by key:
const users = [
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' },
  { id: 1, name: 'Alice2' },
];

const stream = new Stream(fromArray(users), s);
const uniqueUsers = stream.distinctBy(u => u.id);
// Only first occurrence of each id

DistinctAdjacent

Remove consecutive duplicates:
const stream = new Stream(fromArray([1, 1, 2, 2, 3, 1, 1]), s);

const deduped = stream.distinctAdjacent();
const [err, result] = await deduped.toArray();
// result = [1, 2, 3, 1]

Combining Streams

Concat

const stream1 = new Stream(fromArray([1, 2, 3]), s);
const stream2 = new Stream(fromArray([4, 5, 6]), s);

const combined = stream1.concat(stream2);
const [err, result] = await combined.toArray();
// result = [1, 2, 3, 4, 5, 6]

Merge

Interleave values from multiple streams:
const merged = stream1.merge(stream2);
// Emits values from both streams as they arrive

Zip

Pair elements from two streams:
const stream1 = new Stream(fromArray([1, 2, 3]), s);
const stream2 = new Stream(fromArray(['a', 'b', 'c']), s);

const zipped = stream1.zip(stream2);
const [err, result] = await zipped.toArray();
// result = [[1, 'a'], [2, 'b'], [3, 'c']]

ZipWith

Zip with custom combiner:
const combined = stream1.zipWith(stream2, (a, b) => `${a}-${b}`);
const [err, result] = await combined.toArray();
// result = ['1-a', '2-b', '3-c']

Error Handling

CatchError

Recover from errors:
const stream = new Stream(failingSource, s);

const recovered = stream.catchError(error => {
  console.error('Stream error:', error);
  return fromArray([0]); // Fallback value
});

OrElse

Provide fallback stream:
const fallback = new Stream(fromArray([0]), s);
const withFallback = stream.orElse(fallback);

TapError

Side effect on errors:
const logged = stream.tapError(error => {
  console.error('Error:', error);
});

Ensuring

Cleanup on completion:
const withCleanup = stream.ensuring(async () => {
  console.log('Stream completed');
  await cleanup();
});

Advanced Operations

SwitchMap

Switch to new stream on each outer value:
const outer = new Stream(fromArray([1, 2, 3]), s);

const switched = outer.switchMap(x => 
  new Stream(fromArray([x, x * 10, x * 100]), s)
);
// Cancels previous inner stream when new outer value arrives

GroupBy

Group elements by key:
const stream = new Stream(fromArray([1, 2, 3, 4, 5, 6]), s);

const { groups, done } = stream.groupByKey(x => x % 2 === 0 ? 'even' : 'odd');

const evens = groups.get('even');
const odds = groups.get('odd');

await done; // Wait for grouping to complete

Pairwise

Emit consecutive pairs:
const stream = new Stream(fromArray([1, 2, 3, 4, 5]), s);

const pairs = stream.pairwise();
const [err, result] = await pairs.toArray();
// result = [[1, 2], [2, 3], [3, 4], [4, 5]]

Window

Sliding window of values:
const stream = new Stream(fromArray([1, 2, 3, 4, 5]), s);

const windows = stream.window(3);
const [err, result] = await windows.toArray();
// result = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

Terminal Operations

ToArray

Collect all values:
const [err, values] = await stream.toArray();

ForEach

Execute side effect per value:
const [err] = await stream.forEach(value => {
  console.log(value);
});

Reduce

Fold to single value:
const [err, sum] = await stream.reduce((acc, x) => acc + x, 0);

First / Last

const [err1, first] = await stream.first();
const [err2, last] = await stream.last();

Count

const [err, count] = await stream.count();

Some / Every

const [err1, hasEven] = await stream.some(x => x % 2 === 0);
const [err2, allPositive] = await stream.every(x => x > 0);

Real-World Examples

Processing Log Files

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

await using s = scope();

const fileStream = createReadStream('app.log');
const rl = createInterface({ input: fileStream });

const logStream = new Stream(rl, s);

const errors = await logStream
  .filter(line => line.includes('ERROR'))
  .map(line => JSON.parse(line))
  .filter(log => log.level === 'error')
  .map(log => ({ timestamp: log.timestamp, message: log.message }))
  .take(100)
  .toArray();

API Pagination

async function* fetchPages(baseUrl: string) {
  let page = 1;
  while (true) {
    const res = await fetch(`${baseUrl}?page=${page}`);
    const data = await res.json();
    if (data.items.length === 0) break;
    yield* data.items;
    page++;
  }
}

await using s = scope();

const items = new Stream(fetchPages('/api/items'), s);

const processed = await items
  .filter(item => item.active)
  .map(item => transformItem(item))
  .buffer(100)  // Process in batches of 100
  .forEach(batch => processBatch(batch));

Real-Time Event Processing

await using s = scope();

const events = s.channel<Event>(1000);
const eventStream = new Stream(events, s);

const processed = eventStream
  .filter(e => e.type === 'user.action')
  .debounce(100)  // Debounce rapid events
  .buffer(10)     // Batch for efficiency
  .forEach(batch => {
    console.log(`Processing ${batch.length} events`);
    return processBatch(batch);
  });

Performance Tips

Lazy Evaluation

Streams are lazy - operations compose without executing until consumed

Buffer Wisely

Use buffering to batch operations and reduce overhead

Early Filtering

Filter early in the pipeline to reduce processing

Limit Memory

Use take() or takeWhile() to avoid unbounded memory usage

Next Steps

Scheduler

Distributed job scheduling with cron support

Channels

Go-style channels for concurrent communication

Build docs developers (and LLMs) love