Skip to main content

Pipeline Processing Pattern

Build composable data processing pipelines using streams and channels. Pipelines transform data through multiple stages with automatic backpressure and cancellation.

Basic Pipeline

Create a simple transformation pipeline:
import { scope, Stream } from '@go-go-scope/stream';

async function processPipeline() {
  await using s = scope();

  // Create a stream from data source
  const data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  
  const [err, results] = await s.stream(data)
    .map(x => x * 2)           // Double each value
    .filter(x => x > 10)       // Keep only values > 10
    .map(x => ({ value: x }))  // Transform to objects
    .toArray();

  if (err) {
    console.error('Pipeline failed:', err);
    return;
  }

  console.log('Results:', results);
  // [{ value: 12 }, { value: 14 }, { value: 16 }, { value: 18 }, { value: 20 }]
}

Channel-Based Pipeline

Build pipelines using channels for producer-consumer patterns:
1

Create input channel

Set up the data source:
await using s = scope();
const input = s.channel<string>(100);

// Producer: Feed data into pipeline
s.task(async () => {
  const files = await listFiles('./data');
  
  for (const file of files) {
    const content = await readFile(file);
    await input.send(content);
  }
  
  input.close();
});
2

Add transformation stages

Chain multiple processing stages:
// Stage 1: Parse JSON
const parsed = input.map(content => {
  try {
    return JSON.parse(content);
  } catch {
    return null;
  }
}).filter(data => data !== null);

// Stage 2: Validate
const validated = parsed.filter(data => 
  data.id && data.timestamp
);

// Stage 3: Transform
const transformed = validated.map(data => ({
  id: data.id,
  timestamp: new Date(data.timestamp),
  value: data.value ?? 0,
}));
3

Consume results

Process the final output:
// Consumer: Write to database
s.task(async () => {
  const batch: unknown[] = [];
  const batchSize = 100;

  for await (const item of transformed) {
    batch.push(item);

    if (batch.length >= batchSize) {
      await db.insertMany(batch);
      batch.length = 0;
    }
  }

  // Flush remaining items
  if (batch.length > 0) {
    await db.insertMany(batch);
  }
});

Parallel Pipeline Stages

Process stages in parallel for CPU-intensive operations:
import { scope } from 'go-go-scope';

async function parallelPipeline() {
  await using s = scope();

  // Input data
  const input = s.channel<number>(1000);
  const output = s.channel<number>(1000);

  // Producer
  s.task(async () => {
    for (let i = 0; i < 10000; i++) {
      await input.send(i);
    }
    input.close();
  });

  // Parallel processing stage (4 workers)
  const workers = 4;
  for (let i = 0; i < workers; i++) {
    s.task(async ({ signal }) => {
      for await (const value of input) {
        if (signal.aborted) break;

        // CPU-intensive operation
        const result = await processData(value);
        await output.send(result);
      }
    });
  }

  // Monitor when all workers finish
  s.task(async ({ signal }) => {
    // Wait for input channel to close
    while (!input.isClosed && !signal.aborted) {
      await new Promise(r => setTimeout(r, 100));
    }

    // Wait for all items to be processed
    while (input.size > 0 && !signal.aborted) {
      await new Promise(r => setTimeout(r, 10));
    }

    output.close();
  });

  // Consumer
  const results: number[] = [];
  for await (const result of output) {
    results.push(result);
  }

  return results;
}

Backpressure Handling

Manage flow control with different backpressure strategies:
import { scope } from 'go-go-scope';

async function pipelineWithBackpressure() {
  await using s = scope();

  // Fast producer
  const fastInput = s.channel(
    { 
      capacity: 100,
      backpressure: 'block'  // Block when full
    }
  );

  // Slow consumer buffer
  const slowBuffer = s.channel(
    { 
      capacity: 10,
      backpressure: 'drop-oldest',  // Drop old items
      onDrop: (item) => {
        console.warn('Dropped item:', item);
      }
    }
  );

  // Fast producer
  s.task(async () => {
    for (let i = 0; i < 1000; i++) {
      await fastInput.send(i);
    }
    fastInput.close();
  });

  // Transform and feed to slow buffer
  s.task(async () => {
    for await (const item of fastInput) {
      const processed = await processData(item);
      await slowBuffer.send(processed);
    }
    slowBuffer.close();
  });

  // Slow consumer
  s.task(async () => {
    for await (const item of slowBuffer) {
      await slowOperation(item);
    }
  });
}

Error Handling in Pipelines

Handle errors gracefully without breaking the pipeline:
import { scope } from 'go-go-scope';

interface Result<T> {
  success: boolean;
  data?: T;
  error?: string;
}

async function resilientPipeline() {
  await using s = scope();

  const input = s.channel<string>(100);
  const output = s.channel<Result<unknown>>(100);

  // Error-handling stage
  s.task(async ({ signal }) => {
    for await (const item of input) {
      if (signal.aborted) break;

      try {
        const result = await processItem(item);
        await output.send({
          success: true,
          data: result,
        });
      } catch (error) {
        // Continue processing even on error
        await output.send({
          success: false,
          error: error instanceof Error ? error.message : 'Unknown error',
        });
      }
    }
    output.close();
  });

  // Consumer: Separate success and errors
  const successes: unknown[] = [];
  const errors: string[] = [];

  for await (const result of output) {
    if (result.success && result.data) {
      successes.push(result.data);
    } else if (result.error) {
      errors.push(result.error);
    }
  }

  return { successes, errors };
}

Advanced Stream Operations

Use specialized stream operations for complex pipelines:
import { scope, Stream } from '@go-go-scope/stream';

async function advancedPipeline() {
  await using s = scope();

  const data = generateDataStream(); // Returns AsyncIterable

  const [err, results] = await s.stream(data)
    // Batch processing
    .buffer(50)
    .map(batch => processBatch(batch))
    
    // Debounce rapid changes
    .debounce(100)
    
    // Window for time-based aggregation
    .window(1000)  // 1 second windows
    .map(window => ({
      timestamp: Date.now(),
      count: window.length,
      sum: window.reduce((a, b) => a + b, 0),
    }))
    
    // Take only first 100 results
    .take(100)
    
    // Collect to array
    .toArray();

  if (err) throw err;
  return results;
}

Best Practices

  • Lazy evaluation: Streams only process what’s needed
  • Bounded channels: Always set capacity limits
  • Error boundaries: Handle errors per-stage, not globally
  • Monitor backpressure: Log dropped items for debugging
  • Graceful shutdown: Close channels properly to avoid leaks
  • Parallel stages: Use workers for CPU-intensive transformations
  • Batch operations: Buffer items for efficient I/O

Build docs developers (and LLMs) love