Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
This quickstart will guide you through installing Apache Beam and running your first WordCount pipeline. WordCount is a classic example that demonstrates core Beam concepts: reading data, applying transformations, and writing results.
Choose your language
Install Python SDK Apache Beam requires Python 3.8 or newer. Install the Beam SDK using pip: For Google Cloud Dataflow support, install with the gcp extra: pip install apache-beam[gcp]
Create your pipeline Create a file named wordcount.py with the following code: import re
import argparse
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
def main ( argv = None ):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input' ,
dest = 'input' ,
default = 'gs://dataflow-samples/shakespeare/kinglear.txt' ,
help = 'Input file to process.' )
parser.add_argument(
'--output' ,
dest = 'output' ,
default = 'output.txt' ,
help = 'Output file to write results to.' )
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline( options = pipeline_options) as p:
# Read the text file into a PCollection
lines = p | ReadFromText(known_args.input)
# Count the occurrences of each word
counts = (
lines
| 'Split' >> beam.FlatMap( lambda x : re.findall( r ' [ A-Za-z \' ] + ' , x))
| 'PairWithOne' >> beam.Map( lambda x : (x, 1 ))
| 'GroupAndSum' >> beam.CombinePerKey( sum ))
# Format the counts into strings
output = counts | 'Format' >> beam.Map(
lambda word_count : ' %s : %s ' % (word_count[ 0 ], word_count[ 1 ]))
# Write the output
output | WriteToText(known_args.output)
if __name__ == '__main__' :
main()
Run the pipeline Execute the pipeline locally using the DirectRunner: python wordcount.py --output output.txt
This will process Shakespeare’s King Lear and write word counts to output.txt-00000-of-00001. What’s happening?
Read input
ReadFromText reads lines from the input file into a PCollection of strings.
Split into words
FlatMap applies a regex to extract individual words from each line.
Create key-value pairs
Map transforms each word into a tuple of (word, 1).
Count occurrences
CombinePerKey groups by word and sums the counts.
Format output
Another Map formats each word-count pair as a string.
Write results
WriteToText writes the formatted results to output files.
Install Java SDK Apache Beam requires Java 8 or newer. Add the Beam SDK to your Maven pom.xml: < dependencies >
< dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-sdks-java-core </ artifactId >
< version > 2.53.0 </ version >
</ dependency >
< dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-runners-direct-java </ artifactId >
< version > 2.53.0 </ version >
< scope > runtime </ scope >
</ dependency >
</ dependencies >
Create your pipeline Create WordCount.java: package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options. * ;
import org.apache.beam.sdk.transforms. * ;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class WordCount {
static class ExtractWordsFn extends DoFn < String , String > {
private final Pattern pattern = Pattern . compile ( "[a-zA-Z']+" );
@ ProcessElement
public void processElement (@ Element String element ,
OutputReceiver < String > receiver ) {
Stream < String > stream = pattern . splitAsStream (element);
stream . forEach (word -> {
if ( ! word . isEmpty ()) {
receiver . output (word);
}
});
}
}
static class FormatAsTextFn extends SimpleFunction < KV < String , Long >, String > {
@ Override
public String apply ( KV < String , Long > input ) {
return input . getKey () + ": " + input . getValue ();
}
}
public interface WordCountOptions extends PipelineOptions {
@ Description ( "Path of the file to read from" )
@ Default.String ( "gs://apache-beam-samples/shakespeare/kinglear.txt" )
String getInputFile ();
void setInputFile ( String value );
@ Description ( "Path of the file to write to" )
@ Validation . Required
String getOutput ();
void setOutput ( String value );
}
public static void main ( String [] args ) {
WordCountOptions options = PipelineOptionsFactory
. fromArgs (args)
. withValidation ()
. as ( WordCountOptions . class );
Pipeline p = Pipeline . create (options);
p . apply ( "ReadLines" , TextIO . read (). from ( options . getInputFile ()))
. apply ( "ExtractWords" , ParDo . of ( new ExtractWordsFn ()))
. apply ( "CountWords" , Count . perElement ())
. apply ( "FormatResults" , MapElements . via ( new FormatAsTextFn ()))
. apply ( "WriteCounts" , TextIO . write (). to ( options . getOutput ()));
p . run (). waitUntilFinish ();
}
}
Run the pipeline Compile and run your pipeline: mvn compile exec:java -Dexec.mainClass=com.example.WordCount \
-Dexec.args= "--output=output.txt"
What’s happening?
Read input
TextIO.read() reads lines from the input file into a PCollection.
Extract words
ParDo applies the ExtractWordsFn DoFn to split lines into words.
Count occurrences
Count.perElement() counts how many times each word appears.
Format output
MapElements formats each KV<String, Long> as a readable string.
Write results
TextIO.write() writes the formatted results to output files.
Install Go SDK Apache Beam requires Go 1.20 or newer. Install the Beam Go SDK: go get -u github.com/apache/beam/sdks/v2/go/pkg/beam
Create your pipeline Create wordcount.go: package main
import (
" context "
" flag "
" fmt "
" log "
" regexp "
" strings "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio "
" github.com/apache/beam/sdks/v2/go/pkg/beam/register "
" github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats "
" github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx "
)
var (
input = flag . String ( "input" , "gs://apache-beam-samples/shakespeare/kinglear.txt" ,
"Input file" )
output = flag . String ( "output" , "" , "Output file (required)" )
wordRE = regexp . MustCompile ( `[a-zA-Z]+('[a-z])?` )
)
func init () {
register . DoFn3x0 [ context . Context , string , func ( string )]( & extractFn {})
register . Function2x1 ( formatFn )
register . Emitter1 [ string ]()
}
type extractFn struct {}
func ( f * extractFn ) ProcessElement ( ctx context . Context , line string ,
emit func ( string )) {
for _ , word := range wordRE . FindAllString ( line , - 1 ) {
emit ( word )
}
}
func formatFn ( w string , c int ) string {
return fmt . Sprintf ( " %s : %v " , w , c )
}
func CountWords ( s beam . Scope , lines beam . PCollection ) beam . PCollection {
s = s . Scope ( "CountWords" )
col := beam . ParDo ( s , & extractFn {}, lines )
return stats . Count ( s , col )
}
func main () {
flag . Parse ()
beam . Init ()
if * output == "" {
log . Fatal ( "No output provided" )
}
p := beam . NewPipeline ()
s := p . Root ()
lines := textio . Read ( s , * input )
counted := CountWords ( s , lines )
formatted := beam . ParDo ( s , formatFn , counted )
textio . Write ( s , * output , formatted )
if err := beamx . Run ( context . Background (), p ); err != nil {
log . Fatalf ( "Failed to execute job: %v " , err )
}
}
Run the pipeline Execute the pipeline locally: go run wordcount.go --output=output.txt
What’s happening?
Read input
textio.Read() reads lines from the input file into a PCollection.
Extract words
The extractFn DoFn uses regex to extract words from each line and emits them.
Count occurrences
stats.Count() counts occurrences of each word.
Format output
The formatFn function formats word-count pairs as strings.
Write results
textio.Write() writes the formatted results to output files.
Running on other runners
By default, pipelines run locally using the DirectRunner. To run on a distributed backend:
Python - Dataflow
Java - Flink
Go - Spark
python wordcount.py \
--runner=DataflowRunner \
--project=YOUR_PROJECT_ID \
--region=us-central1 \
--temp_location=gs://YOUR_BUCKET/temp \
--output=gs://YOUR_BUCKET/output
Next steps
Programming guide Learn about advanced transforms, windowing, and triggers
Pipeline I/O Connect to databases, message queues, and cloud storage
Testing pipelines Write unit and integration tests for your pipelines
Tour of Beam Interactive tutorials covering all Beam concepts