Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
The FlinkRunner executes Apache Beam pipelines on Apache Flink clusters, providing high-throughput, low-latency stream and batch processing.
Overview
Apache Flink is a distributed stream processing framework that excels at:
Stream Processing : True streaming with low latency and high throughput
Exactly-Once Processing : Strong consistency guarantees with checkpointing
State Management : Distributed stateful computations
Event Time Processing : Native support for event time and watermarks
When to Use FlinkRunner
Best For
Real-time streaming applications
Stateful stream processing
Low-latency requirements
Exactly-once processing semantics
Complex event processing
Existing Flink infrastructure
Consider Alternatives
Simple batch jobs (DirectRunner)
GCP-based workloads (DataflowRunner)
Existing Spark clusters (SparkRunner)
Local development (PrismRunner)
Setup and Configuration
Prerequisites
Apache Flink cluster (1.15 or later recommended)
Java 8 or later
Network access to Flink JobManager
Dependencies
Add to your pom.xml: < dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-runners-flink-1.18 </ artifactId >
< version > {beam-version} </ version >
</ dependency >
For Gradle: implementation 'org.apache.beam:beam-runners-flink-1.18:{beam-version}'
Replace 1.18 with your Flink version. Supported versions include 1.15, 1.16, 1.17, and 1.18.
Install Beam with Flink runner support: pip install apache-beam[flink]
The Flink runner uses the portable Beam model: # Start a Flink cluster first
# Then submit the job
python my_pipeline.py \
--runner=FlinkRunner \
--flink_master=localhost:8081
The Go SDK supports Flink through the portable runner: import " github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink "
Flink Cluster Setup
Local Flink Cluster
# Download Flink
wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
tar -xzf flink-1.18.0-bin-scala_2.12.tgz
cd flink-1.18.0
# Start local cluster
./bin/start-cluster.sh
# Access web UI at http://localhost:8081
Flink on Kubernetes
apiVersion : flink.apache.org/v1beta1
kind : FlinkDeployment
metadata :
name : beam-flink-cluster
spec :
image : flink:1.18
flinkVersion : v1_18
jobManager :
resource :
memory : "2048m"
cpu : 1
taskManager :
resource :
memory : "2048m"
cpu : 1
replicas : 2
Running a Pipeline
Basic Example
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class MyFlinkPipeline {
public static void main ( String [] args ) {
FlinkPipelineOptions options =
PipelineOptionsFactory . fromArgs (args)
. withValidation ()
. as ( FlinkPipelineOptions . class );
// Set the runner
options . setRunner ( FlinkRunner . class );
// Flink master address
options . setFlinkMaster ( "localhost:8081" );
// Parallelism
options . setParallelism ( 4 );
Pipeline p = Pipeline . create (options);
// Build your pipeline
p . apply ( /* your transforms */ );
// Execute on Flink
p . run (). waitUntilFinish ();
}
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run ():
options = PipelineOptions([
'--runner=FlinkRunner' ,
'--flink_master=localhost:8081' ,
'--parallelism=4' ,
'--flink_submit_uber_jar' ,
])
with beam.Pipeline( options = options) as p:
# Build your pipeline
(p
| beam.Create([ 'Hello' , 'Flink' ])
| beam.Map( lambda x : x.upper())
| beam.Map( print ))
if __name__ == '__main__' :
run()
package main
import (
" context "
" flag "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink "
)
var (
flinkMaster = flag . String ( "flink_master" , "localhost:8081" , "Flink master" )
)
func main () {
flag . Parse ()
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Build your pipeline
if err := flink . Execute ( context . Background (), p ); err != nil {
panic ( err )
}
}
Execution Modes
Local Mode
Run with an embedded Flink cluster:
FlinkPipelineOptions options =
PipelineOptionsFactory . as ( FlinkPipelineOptions . class );
options . setRunner ( FlinkRunner . class );
options . setFlinkMaster ( "[local]" ); // Embedded local cluster
options . setParallelism ( 2 );
Cluster Mode
Submit to an existing Flink cluster:
# Java
mvn package
flink run \
-c com.example.MyPipeline \
target/my-pipeline-bundled.jar \
--runner=FlinkRunner \
--flinkMaster=localhost:8081 \
--parallelism=4
Auto Mode
options . setFlinkMaster ( "[auto]" ); // Detect based on environment
FlinkPipelineOptions
Core Options
Address of the Flink JobManager. Can be:
host:port - Connect to remote cluster
[local] - Start local embedded cluster
[auto] - Auto-detect based on environment
options . setFlinkMaster ( "localhost:8081" );
Degree of parallelism for the pipeline. -1 uses Flink’s default. options . setParallelism ( 8 );
Maximum degree of parallelism. Sets upper limit for dynamic scaling. options . setMaxParallelism ( 128 );
Checkpointing Options
Interval in milliseconds for triggering checkpoints. -1 disables checkpointing. options . setCheckpointingInterval ( 60000L ); // Every 60 seconds
checkpointingMode
string
default: "EXACTLY_ONCE"
Checkpointing mode: EXACTLY_ONCE or AT_LEAST_ONCE. options . setCheckpointingMode ( "EXACTLY_ONCE" );
Maximum time in milliseconds for a checkpoint to complete. options . setCheckpointTimeoutMillis ( 600000L ); // 10 minutes
minPauseBetweenCheckpoints
Minimum pause in milliseconds between checkpoints. options . setMinPauseBetweenCheckpoints ( 5000L ); // 5 seconds
Maximum number of concurrent checkpoints. options . setNumConcurrentCheckpoints ( 1 );
State Backend Options
State backend for storing state. Options: filesystem, rocksdb, memory. options . setStateBackend ( "rocksdb" );
Directory for storing checkpoints. options . setCheckpointingDirectory ( "hdfs:///checkpoints" );
Streaming Options
Enable streaming mode for unbounded sources. options . setStreaming ( true );
Interval for automatic watermark emission in milliseconds. options . setAutoWatermarkInterval ( 200L );
Advanced Configuration
Exactly-Once Processing
Configure for exactly-once semantics:
FlinkPipelineOptions options =
PipelineOptionsFactory . as ( FlinkPipelineOptions . class );
// Enable checkpointing for exactly-once
options . setCheckpointingInterval ( 60000L );
options . setCheckpointingMode ( "EXACTLY_ONCE" );
options . setCheckpointingDirectory ( "hdfs:///beam/checkpoints" );
// State backend
options . setStateBackend ( "rocksdb" );
// Fault tolerance
options . setFailOnCheckpointingErrors ( true );
options . setMinPauseBetweenCheckpoints ( 5000L );
State Management
// Use RocksDB for large state
options . setStateBackend ( "rocksdb" );
// Configure state backend
Map < String , String > config = new HashMap <>();
config . put ( "state.backend.rocksdb.memory.managed" , "true" );
config . put ( "state.backend.rocksdb.block.cache-size" , "256m" );
options . setFlinkConfDir ( "/path/to/flink-conf" );
Savepoints
Start from a savepoint:
flink run \
-s hdfs:///savepoints/savepoint-123 \
-c com.example.MyPipeline \
target/my-pipeline.jar
// In code
options . setSavepointPath ( "hdfs:///savepoints/savepoint-123" );
Window Configuration
import org.apache.beam.sdk.transforms.windowing. * ;
pipeline
. apply ( /* source */ )
. apply (Window. < String > into (
FixedWindows . of ( Duration . standardMinutes ( 1 )))
. triggering ( AfterWatermark . pastEndOfWindow ()
. withEarlyFirings ( AfterProcessingTime
. pastFirstElementInPane ()
. plusDelayOf ( Duration . standardSeconds ( 30 )))
. withLateFirings ( AfterPane . elementCountAtLeast ( 1 )))
. withAllowedLateness ( Duration . standardMinutes ( 5 ))
. discardingFiredPanes ());
Batch vs Streaming
Batch Pipeline
FlinkPipelineOptions options =
PipelineOptionsFactory . as ( FlinkPipelineOptions . class );
options . setRunner ( FlinkRunner . class );
// streaming defaults to false for batch
Pipeline p = Pipeline . create (options);
p . apply ( TextIO . read (). from ( "/path/to/input" ))
. apply ( /* transforms */ )
. apply ( TextIO . write (). to ( "/path/to/output" ));
p . run (). waitUntilFinish ();
Streaming Pipeline
FlinkPipelineOptions options =
PipelineOptionsFactory . as ( FlinkPipelineOptions . class );
options . setRunner ( FlinkRunner . class );
options . setStreaming ( true );
options . setCheckpointingInterval ( 30000L );
Pipeline p = Pipeline . create (options);
p . apply (KafkaIO. < String, String > read ()
. withBootstrapServers ( "localhost:9092" )
. withTopic ( "input-topic" )
. withKeyDeserializer ( StringDeserializer . class )
. withValueDeserializer ( StringDeserializer . class ))
. apply ( /* transforms */ )
. apply (KafkaIO. < Void, String > write ()
. withBootstrapServers ( "localhost:9092" )
. withTopic ( "output-topic" ));
p . run (). waitUntilFinish ();
Monitoring and Debugging
Flink Web UI
Access the Flink web UI at http://jobmanager-host:8081:
View running and completed jobs
Monitor task metrics
Inspect checkpoints and savepoints
View logs and exceptions
Track watermarks and event time
Metrics
Beam metrics are exposed as Flink metrics:
import org.apache.beam.sdk.metrics. * ;
public class MyDoFn extends DoFn < String , String > {
private final Counter counter =
Metrics . counter ( MyDoFn . class , "processed" );
@ ProcessElement
public void processElement ( ProcessContext c ) {
counter . inc ();
c . output ( c . element ());
}
}
Logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyDoFn extends DoFn < String , String > {
private static final Logger LOG = LoggerFactory . getLogger ( MyDoFn . class );
@ ProcessElement
public void processElement ( ProcessContext c ) {
LOG . info ( "Processing: {}" , c . element ());
c . output ( c . element ());
}
}
Logs are available in:
Flink Web UI (per task)
TaskManager log files
Configured log aggregation system
Parallelism
// Set appropriate parallelism
options . setParallelism (numTaskManagers * slotsPerTaskManager);
// Set max parallelism for better key distribution
options . setMaxParallelism ( 512 );
Memory Configuration
// Configure task manager memory
// In flink-conf.yaml:
// taskmanager.memory.process.size: 4096m
// taskmanager.memory.managed.fraction: 0.4
Network Buffers
# In flink-conf.yaml
taskmanager.network.memory.fraction : 0.1
taskmanager.network.memory.min : 64mb
taskmanager.network.memory.max : 1gb
State Backend Tuning
# RocksDB configuration in flink-conf.yaml
state.backend : rocksdb
state.backend.rocksdb.memory.managed : true
state.backend.rocksdb.block.cache-size : 256m
state.backend.incremental : true
Best Practices
Checkpointing Strategy
Enable checkpointing for production
options . setCheckpointingInterval ( 60000L ); // Every minute
options . setCheckpointingMode ( "EXACTLY_ONCE" );
Use appropriate intervals
Shorter intervals: Lower data loss, higher overhead
Longer intervals: Higher data loss, lower overhead
Configure timeout appropriately
options . setCheckpointTimeoutMillis ( 600000L ); // 10 minutes
State Management
Choose the right state backend
Memory: Small state, fast access
Filesystem: Medium state
RocksDB: Large state, slower access
Use incremental checkpoints
state.backend.incremental : true
Monitor state size
Check in Flink UI
Set up alerts for growth
Resource Management
Right-size workers
taskmanager.numberOfTaskSlots : 4
taskmanager.memory.process.size : 4096m
Use appropriate parallelism
Generally: slots_per_tm * num_tm
Consider data skew
Configure backpressure handling
Monitor in Flink UI
Increase parallelism if needed
Troubleshooting
Check:
Flink cluster is running
JobManager address is correct
JAR contains all dependencies
Sufficient resources available
Increase checkpoint timeout
Check state backend configuration
Verify checkpoint directory is accessible
Monitor state size growth
Increase TaskManager memory
Reduce parallelism
Use RocksDB state backend
Enable incremental checkpoints
Check for backpressure in UI
Increase parallelism
Optimize transforms
Reduce checkpoint frequency
Check for stuck sources
Verify watermark generation
Look for slow tasks in UI
Check for data skew
Runner Capabilities
Supported Features
✅ Batch and streaming
✅ Exactly-once processing
✅ Event time processing
✅ State and timers
✅ Side inputs
✅ All window types
✅ Custom triggers
✅ Savepoints
Limitations
Limited support for some Beam transforms
Requires Flink cluster management
State size limited by backend choice
Next Steps
Flink Documentation Learn more about Apache Flink
SparkRunner Alternative distributed runner
State & Timers Advanced stateful processing
Windowing Learn about windowing strategies