Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
The SparkRunner executes Apache Beam pipelines on Apache Spark clusters, enabling you to leverage existing Spark infrastructure for both batch and streaming workloads.
Overview
Apache Spark is a unified analytics engine that provides:
Batch Processing : Fast in-memory batch processing
Streaming : Structured Streaming for real-time processing
SQL Support : Integrate with Spark SQL and DataFrames
Ecosystem : Rich ecosystem of libraries and connectors
Scalability : Run on clusters from laptops to data centers
When to Use SparkRunner
Best For
Existing Spark infrastructure
Spark ecosystem integration
Unified batch and streaming
On-premise deployments
Cost-effective large-scale batch
Teams familiar with Spark
Consider Alternatives
Pure streaming apps (FlinkRunner)
GCP workloads (DataflowRunner)
Local development (DirectRunner)
Low-latency streaming (FlinkRunner)
Setup and Configuration
Prerequisites
Apache Spark 3.0 or later
Java 8 or later
Spark cluster or local Spark installation
Dependencies
Add to your pom.xml: < dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-runners-spark-3 </ artifactId >
< version > {beam-version} </ version >
</ dependency >
For Gradle: implementation 'org.apache.beam:beam-runners-spark-3:{beam-version}'
Use beam-runners-spark-3 for Spark 3.x. For Spark 2.x, use beam-runners-spark.
Install Beam with Spark support: pip install apache-beam[spark]
The Spark runner uses the portable model: python my_pipeline.py \
--runner=SparkRunner \
--spark_master_url=spark://localhost:7077
Add to your build.sbt: libraryDependencies += "org.apache.beam" % "beam-runners-spark-3" % beamVersion
Spark Cluster Setup
Local Spark
# Download Spark
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzf spark-3.5.0-bin-hadoop3.tgz
cd spark-3.5.0-bin-hadoop3
# Start local master and worker
./sbin/start-master.sh
./sbin/start-worker.sh spark://localhost:7077
# Access UI at http://localhost:8080
Spark on YARN
# Submit to YARN cluster
spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.example.MyPipeline \
target/my-pipeline.jar
Spark on Kubernetes
# Submit to Kubernetes
spark-submit \
--master k8s://https://kubernetes-api:6443 \
--deploy-mode cluster \
--class com.example.MyPipeline \
target/my-pipeline.jar
Running a Pipeline
Basic Example
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class MySparkPipeline {
public static void main ( String [] args ) {
SparkPipelineOptions options =
PipelineOptionsFactory . fromArgs (args)
. withValidation ()
. as ( SparkPipelineOptions . class );
// Set the runner
options . setRunner ( SparkRunner . class );
// Spark master URL
options . setSparkMaster ( "spark://localhost:7077" );
// Or local mode
// options.setSparkMaster("local[4]");
Pipeline p = Pipeline . create (options);
// Build your pipeline
p . apply ( /* your transforms */ );
// Execute on Spark
p . run (). waitUntilFinish ();
}
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run ():
options = PipelineOptions([
'--runner=SparkRunner' ,
'--spark_master_url=spark://localhost:7077' ,
])
with beam.Pipeline( options = options) as p:
# Build your pipeline
(p
| beam.Create([ 'Hello' , 'Spark' ])
| beam.Map( lambda x : x.upper())
| beam.Map( print ))
if __name__ == '__main__' :
run()
import org . apache . beam . runners . spark . SparkRunner
import org . apache . beam . sdk . Pipeline
import org . apache . beam . sdk . options . PipelineOptionsFactory
object MySparkPipeline {
def main ( args : Array [ String ]) : Unit = {
val options = PipelineOptionsFactory
.fromArgs( args : _* )
.withValidation()
.as( classOf [ SparkPipelineOptions ])
options.setRunner( classOf [ SparkRunner ])
options.setSparkMaster( "local[4]" )
val pipeline = Pipeline .create(options)
// Build your pipeline
pipeline.run().waitUntilFinish()
}
}
Execution Modes
Local Mode
Run with local Spark:
SparkPipelineOptions options =
PipelineOptionsFactory . as ( SparkPipelineOptions . class );
options . setRunner ( SparkRunner . class );
options . setSparkMaster ( "local[*]" ); // Use all cores
// or
options . setSparkMaster ( "local[4]" ); // Use 4 cores
Cluster Mode
Submit to a Spark cluster:
spark-submit \
--master spark://master:7077 \
--class com.example.MyPipeline \
--deploy-mode cluster \
target/my-pipeline.jar \
--runner=SparkRunner
SparkPipelineOptions
Core Options
Spark master URL:
local[n] - Local mode with n threads
local[*] - Local mode with all cores
spark://host:port - Spark standalone cluster
yarn - YARN cluster
k8s://host:port - Kubernetes cluster
options . setSparkMaster ( "spark://localhost:7077" );
Application name displayed in Spark UI. options . setAppName ( "My Beam Pipeline" );
Streaming Options
Enable streaming mode for unbounded sources. options . setStreaming ( true );
Batch interval for Spark Streaming in milliseconds. options . setBatchIntervalMillis ( 1000L ); // 1 second
Checkpoint interval in milliseconds. -1 uses Spark’s default. options . setCheckpointDurationMillis ( 10000L ); // 10 seconds
Directory for Spark checkpoints. options . setCheckpointDir ( "hdfs:///checkpoints" );
Bundle size for splitting bounded sources. 0 uses Spark’s default parallelism. options . setBundleSize ( 1000L );
Disable caching of reused PCollections. options . setCacheDisabled ( true );
Maximum records per micro-batch for streaming sources. options . setMaxRecordsPerBatch ( 10000L );
Resource Configuration
Spark configuration properties. Map < String , String > conf = new HashMap <>();
conf . put ( "spark.executor.memory" , "4g" );
conf . put ( "spark.executor.cores" , "2" );
options . setSparkSubmitConf (conf);
Advanced Configuration
Using Existing SparkContext
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.beam.runners.spark.SparkContextOptions;
// Create Spark context
SparkConf conf = new SparkConf ()
. setAppName ( "My Pipeline" )
. setMaster ( "local[4]" );
JavaSparkContext jsc = new JavaSparkContext (conf);
// Use with Beam
SparkContextOptions options =
PipelineOptionsFactory . as ( SparkContextOptions . class );
options . setUsesProvidedSparkContext ( true );
options . setProvidedSparkContext (jsc);
Pipeline p = Pipeline . create (options);
// Build pipeline
p . run (). waitUntilFinish ();
jsc . stop ();
Spark Configuration
SparkPipelineOptions options =
PipelineOptionsFactory . as ( SparkPipelineOptions . class );
Map < String , String > sparkConf = new HashMap <>();
// Executor configuration
sparkConf . put ( "spark.executor.memory" , "4g" );
sparkConf . put ( "spark.executor.cores" , "2" );
sparkConf . put ( "spark.executor.instances" , "10" );
// Driver configuration
sparkConf . put ( "spark.driver.memory" , "2g" );
sparkConf . put ( "spark.driver.cores" , "1" );
// Dynamic allocation
sparkConf . put ( "spark.dynamicAllocation.enabled" , "true" );
sparkConf . put ( "spark.dynamicAllocation.minExecutors" , "2" );
sparkConf . put ( "spark.dynamicAllocation.maxExecutors" , "20" );
// Shuffle configuration
sparkConf . put ( "spark.shuffle.service.enabled" , "true" );
options . setSparkSubmitConf (sparkConf);
Checkpointing
For streaming pipelines:
SparkPipelineOptions options =
PipelineOptionsFactory . as ( SparkPipelineOptions . class );
options . setStreaming ( true );
options . setCheckpointDir ( "hdfs:///beam/checkpoints" );
options . setCheckpointDurationMillis ( 10000L ); // 10 seconds
Batch vs Streaming
Batch Pipeline
SparkPipelineOptions options =
PipelineOptionsFactory . as ( SparkPipelineOptions . class );
options . setRunner ( SparkRunner . class );
options . setSparkMaster ( "local[*]" );
// streaming defaults to false
Pipeline p = Pipeline . create (options);
p . apply ( TextIO . read (). from ( "/path/to/input" ))
. apply ( /* transforms */ )
. apply ( TextIO . write (). to ( "/path/to/output" ));
p . run (). waitUntilFinish ();
Streaming Pipeline
SparkPipelineOptions options =
PipelineOptionsFactory . as ( SparkPipelineOptions . class );
options . setRunner ( SparkRunner . class );
options . setSparkMaster ( "local[*]" );
options . setStreaming ( true );
options . setBatchIntervalMillis ( 1000L );
options . setCheckpointDir ( "/tmp/checkpoint" );
Pipeline p = Pipeline . create (options);
p . apply (KafkaIO. < String, String > read ()
. withBootstrapServers ( "localhost:9092" )
. withTopic ( "input-topic" )
. withKeyDeserializer ( StringDeserializer . class )
. withValueDeserializer ( StringDeserializer . class ))
. apply ( /* transforms */ )
. apply (KafkaIO. < Void, String > write ()
. withBootstrapServers ( "localhost:9092" )
. withTopic ( "output-topic" ));
p . run (). waitUntilFinish ();
Monitoring and Debugging
Spark UI
Access Spark UI for monitoring:
Master UI : http://master:8080 (standalone mode)
Application UI : http://driver:4040
History Server : http://history-server:18080
View:
Job stages and tasks
Executor metrics
Storage and memory usage
Environment configuration
Metrics
Beam metrics are available in Spark:
import org.apache.beam.sdk.metrics. * ;
public class MyDoFn extends DoFn < String , String > {
private final Counter counter =
Metrics . counter ( MyDoFn . class , "processed" );
private final Distribution distribution =
Metrics . distribution ( MyDoFn . class , "size" );
@ ProcessElement
public void processElement ( ProcessContext c ) {
counter . inc ();
distribution . update ( c . element (). length ());
c . output ( c . element ());
}
}
// Read metrics after execution
PipelineResult result = p . run ();
result . waitUntilFinish ();
MetricResults metrics = result . metrics ();
MetricQueryResults counters = metrics . queryMetrics (
MetricsFilter . builder ()
. addNameFilter ( MetricNameFilter . named ( MyDoFn . class , "processed" ))
. build ());
Logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyDoFn extends DoFn < String , String > {
private static final Logger LOG = LoggerFactory . getLogger ( MyDoFn . class );
@ ProcessElement
public void processElement ( ProcessContext c ) {
LOG . info ( "Processing: {}" , c . element ());
c . output ( c . element ());
}
}
Logs are available:
In Spark executor logs
Through Spark UI
In configured log aggregation system
Parallelism
// Control bundle size for better parallelism
options . setBundleSize ( 1000L );
// Use appropriate Spark parallelism
Map < String , String > conf = new HashMap <>();
conf . put ( "spark.default.parallelism" , "200" );
conf . put ( "spark.sql.shuffle.partitions" , "200" );
options . setSparkSubmitConf (conf);
Memory Management
Map < String , String > conf = new HashMap <>();
// Executor memory
conf . put ( "spark.executor.memory" , "8g" );
conf . put ( "spark.executor.memoryOverhead" , "2g" );
// Memory fractions
conf . put ( "spark.memory.fraction" , "0.8" );
conf . put ( "spark.memory.storageFraction" , "0.3" );
options . setSparkSubmitConf (conf);
Caching Strategy
// Enable caching for reused PCollections (default)
options . setCacheDisabled ( false );
// Or disable if recomputing is faster
options . setCacheDisabled ( true );
Shuffle Optimization
Map < String , String > conf = new HashMap <>();
// Shuffle configuration
conf . put ( "spark.shuffle.compress" , "true" );
conf . put ( "spark.shuffle.spill.compress" , "true" );
conf . put ( "spark.shuffle.file.buffer" , "128k" );
options . setSparkSubmitConf (conf);
Best Practices
Resource Allocation
Right-size executors
conf . put ( "spark.executor.cores" , "4" );
conf . put ( "spark.executor.memory" , "8g" );
conf . put ( "spark.executor.instances" , "10" );
Use dynamic allocation
conf . put ( "spark.dynamicAllocation.enabled" , "true" );
conf . put ( "spark.dynamicAllocation.minExecutors" , "2" );
conf . put ( "spark.dynamicAllocation.maxExecutors" , "50" );
Streaming Best Practices
Choose appropriate batch interval
options . setBatchIntervalMillis ( 1000L ); // Balance latency vs throughput
Enable checkpointing
options . setCheckpointDir ( "hdfs:///checkpoints" );
options . setCheckpointDurationMillis ( 10000L );
Configure backpressure
conf . put ( "spark.streaming.backpressure.enabled" , "true" );
conf . put ( "spark.streaming.kafka.maxRatePerPartition" , "1000" );
Batch Best Practices
Partition data appropriately
Use broadcast variables for small datasets
Avoid wide transformations when possible
Coalesce output partitions
Troubleshooting
Check:
Spark master is running and accessible
JAR contains all dependencies
Spark version compatibility
Network connectivity
Increase executor memory
Increase memory overhead
Reduce partition size
Enable off-heap memory
Disable caching if not beneficial
Reduce batch interval
Increase executors
Enable backpressure
Optimize transforms
Check for bottlenecks in UI
Verify checkpoint directory is accessible
Check HDFS/storage permissions
Ensure sufficient disk space
Monitor checkpoint size
Runner Capabilities
Supported Features
✅ Batch processing
✅ Structured Streaming
✅ Windowing
✅ Triggers
✅ State and timers
✅ Side inputs
✅ Metrics
Limitations
Limited exactly-once streaming support
Batch interval impacts latency
Some Beam features not fully supported
Requires Spark cluster management
Integration Examples
Reading from Hive
// Use Spark's native Hive support
Pipeline p = Pipeline . create (options);
// Use external Hive connector or custom source
Writing to Cassandra
import org.apache.beam.sdk.io.cassandra.CassandraIO;
pipeline
. apply ( /* source */ )
. apply (CassandraIO. < MyType > write ()
. withHosts ( Arrays . asList ( "localhost" ))
. withPort ( 9042 )
. withKeyspace ( "my_keyspace" )
. withTable ( "my_table" ));
Next Steps
Spark Documentation Learn more about Apache Spark
FlinkRunner Alternative for streaming workloads
Performance Optimize pipeline performance
Monitoring Monitor Spark applications