Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt

Use this file to discover all available pages before exploring further.

The transforms package provides the fundamental PTransforms for processing data in Apache Beam pipelines.

ParDo

ParDo is the core element-wise PTransform in Apache Beam, invoking a user-specified function on each element of the input PCollection.

ParDo

Applies a DoFn to each element, producing a single output PCollection.
func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection
s
Scope
The scope to insert the transform into
dofn
any
A DoFn function or struct with ProcessElement method
col
PCollection
The input PCollection to process
opts
...Option
Optional parameters including SideInput and TypeDefinition
Returns
PCollection
Output PCollection with transformed elements
Example:
import (
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

// Define a DoFn
func toUpper(word string) string {
    return strings.ToUpper(word)
}

// Register it
func init() {
    register.Function1x1(toUpper)
}

// Use in pipeline
words := beam.Create(root, "hello", "world")
upperWords := beam.ParDo(root, toUpper, words)

ParDo0

Applies a DoFn with zero outputs (side effects only).
func ParDo0(s Scope, dofn any, col PCollection, opts ...Option)
s
Scope
The scope to insert the transform into
dofn
any
A DoFn that produces no output elements
col
PCollection
The input PCollection to process
opts
...Option
Optional parameters
Example:
func logElement(element string) {
    log.Printf("Processing: %s", element)
}

func init() {
    register.Function1x0(logElement)
}

beam.ParDo0(root, logElement, data)

ParDo2

Applies a DoFn with two output PCollections.
func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection)
Returns
(PCollection, PCollection)
Two output PCollections from the DoFn
Example:
// Partition words by length
func partitionByLength(word string, cutoff int, short, long func(string)) {
    if len(word) < cutoff {
        short(word)
    } else {
        long(word)
    }
}

func init() {
    register.Function4x0(partitionByLength)
}

cutoff := beam.Create(root, 5)
shortWords, longWords := beam.ParDo2(root, partitionByLength, words, 
    beam.SideInput{Input: cutoff})

ParDo3, ParDo4, ParDo5, ParDo6, ParDo7

Apply a DoFn with 3-7 output PCollections.
func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection)
func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection)
// ... etc

ParDoN

Applies a DoFn with any number of outputs.
func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection
Returns
[]PCollection
Slice of output PCollections

DoFn Structure

DoFns can be simple functions or structs with lifecycle methods.

Simple Function DoFn

func processElement(element InputType) OutputType {
    // Process element
    return transformed
}

Struct DoFn with Lifecycle

type MyDoFn struct {
    // State fields (JSON serializable)
    Config string
}

func (fn *MyDoFn) Setup(ctx context.Context) error {
    // One-time initialization
    return nil
}

func (fn *MyDoFn) StartBundle(ctx context.Context) error {
    // Per-bundle initialization
    return nil
}

func (fn *MyDoFn) ProcessElement(ctx context.Context, element InputType, emit func(OutputType)) error {
    // Process each element
    emit(transform(element))
    return nil
}

func (fn *MyDoFn) FinishBundle(ctx context.Context) error {
    // Per-bundle finalization
    return nil
}

func (fn *MyDoFn) Teardown() error {
    // Cleanup
    return nil
}

Side Inputs

Access additional PCollections within a DoFn.
func filterByCutoff(word string, cutoff int, emit func(string)) {
    if len(word) >= cutoff {
        emit(word)
    }
}

func init() {
    register.Function3x0(filterByCutoff)
}

cutoff := beam.Create(root, 5)
filtered := beam.ParDo(root, filterByCutoff, words, 
    beam.SideInput{Input: cutoff})

GroupByKey

Groups values by key and window, converting PCollection<KV<K,V>> to PCollection<KV<K,Iter<V>>>.

GroupByKey

func GroupByKey(s Scope, a PCollection) PCollection
s
Scope
The scope to insert the transform into
a
PCollection
Input PCollection of type KV<K,V>
Returns
PCollection
Output PCollection of type KV<K,Iter<V>> with values grouped by key
Example:
import "github.com/apache/beam/sdks/v2/go/pkg/beam"

// Create key-value pairs
type WordCount struct {
    Word  string
    Count int
}

func extractWord(wc WordCount) (string, int) {
    return wc.Word, wc.Count
}

func init() {
    register.Function1x2(extractWord)
}

wordCounts := beam.Create(root, 
    WordCount{"hello", 1},
    WordCount{"world", 1},
    WordCount{"hello", 1})

kvPairs := beam.ParDo(root, extractWord, wordCounts)
grouped := beam.GroupByKey(root, kvPairs)

CoGroupByKey

Groups multiple PCollections by a common key.
func CoGroupByKey(s Scope, cols ...PCollection) PCollection
s
Scope
The scope to insert the transform into
cols
...PCollection
Multiple input PCollections of type KV<K,V> with the same key type
Returns
PCollection
CoGBK result with iterators for each input collection
Example:
// Join two datasets by common key
emails := beam.Create(root, 
    KV{"user1", "user1@example.com"},
    KV{"user2", "user2@example.com"})

names := beam.Create(root,
    KV{"user1", "Alice"},
    KV{"user2", "Bob"})

joined := beam.CoGroupByKey(root, emails, names)

Reshuffle

Breaks fusion and allows different sharding for subsequent operations.
func Reshuffle(s Scope, col PCollection) PCollection
s
Scope
The scope to insert the transform into
col
PCollection
Input PCollection to reshuffle
Returns
PCollection
Reshuffled PCollection with same elements and windowing
Example:
// High parallelism computation
processed := beam.ParDo(root, expensiveComputation, input)

// Reshuffle before writing to reduce output parallelism
reshuffled := beam.Reshuffle(root, processed)
beam.ParDo0(root, writeToFile, reshuffled)

Combine

Combines all elements in a PCollection or groups using a CombineFn.

Combine

Globally combines all elements in a PCollection.
func Combine(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
s
Scope
The scope to insert the transform into
combinefn
any
A CombineFn or simple combining function
col
PCollection
Input PCollection to combine
opts
...Option
Optional type definitions
Returns
PCollection
Single-element PCollection with the combined result
Example:
// Simple combining function
func sum(a, b int) int {
    return a + b
}

func init() {
    register.Function2x1(sum)
}

numbers := beam.Create(root, 1, 2, 3, 4, 5)
total := beam.Combine(root, sum, numbers)

CombinePerKey

Combines values for each key after GroupByKey.
func CombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
s
Scope
The scope to insert the transform into
combinefn
any
A CombineFn, optionally taking a key parameter
col
PCollection
Input PCollection of type KV<K,V>
opts
...Option
Optional type definitions
Returns
PCollection
PCollection of type KV<K,CombinedValue>
Example:
// Word count using CombinePerKey
func init() {
    register.Function2x1(sum)
}

words := beam.Create(root, "hello", "world", "hello")

// Convert to KV pairs with count 1
func wordToKV(word string) (string, int) {
    return word, 1
}

func init() {
    register.Function1x2(wordToKV)
}

kvPairs := beam.ParDo(root, wordToKV, words)
counts := beam.CombinePerKey(root, sum, kvPairs)

CombineFn Structure

For complex combining logic, implement a CombineFn:
type AverageFn struct{}

type acc struct {
    Sum   float64
    Count int
}

func (fn *AverageFn) CreateAccumulator() acc {
    return acc{}
}

func (fn *AverageFn) AddInput(a acc, value float64) acc {
    return acc{Sum: a.Sum + value, Count: a.Count + 1}
}

func (fn *AverageFn) MergeAccumulators(a, b acc) acc {
    return acc{Sum: a.Sum + b.Sum, Count: a.Count + b.Count}
}

func (fn *AverageFn) ExtractOutput(a acc) float64 {
    if a.Count == 0 {
        return 0
    }
    return a.Sum / float64(a.Count)
}

Registration

All DoFns and functions must be registered for distributed execution:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/register"

func init() {
    // Simple functions
    register.Function1x1(myFunc1x1)  // 1 input, 1 output
    register.Function2x1(myFunc2x1)  // 2 inputs, 1 output
    register.Function1x2(myFunc1x2)  // 1 input, 2 outputs
    
    // DoFn structs
    register.DoFn3x1[context.Context, InputType, func(OutputType), error](&MyDoFn{})
    
    // Emitters
    register.Emitter1[OutputType]()
    register.Emitter2[KeyType, ValueType]()
    
    // Iterators
    register.Iter1[ValueType]()
}

Best Practices

  • Keep DoFns stateless when possible
  • Use struct fields for construction-time configuration
  • Implement Setup/Teardown for expensive resource initialization
  • Make ProcessElement deterministic for fault tolerance
  • GroupByKey triggers a shuffle operation (expensive)
  • Ensure key types have deterministic coders
  • Consider CombinePerKey instead of GroupByKey + ParDo for aggregations
  • Be aware of memory implications with large value lists per key
  • Use Combine instead of GroupByKey when aggregating all elements
  • CombineFns enable efficient parallel and incremental combining
  • Associative and commutative operations work best
  • Implement proper accumulator types for complex aggregations

Build docs developers (and LLMs) love