Skip to main content
Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. This guide covers key strategies to optimize your Spark application’s performance.

Performance Bottlenecks

Spark applications commonly face these bottlenecks:

Network Bandwidth

Most common when data fits in memory

Memory Usage

Critical for large datasets and caching

CPU

Important for compute-intensive operations
If your data fits in memory, the bottleneck is usually network bandwidth. Focus on data serialization and reducing shuffle operations.

Data Serialization

Serialization plays a crucial role in the performance of any distributed application. Formats that are slow to serialize or consume many bytes will greatly slow down computation.

Serialization Options

Spark provides two serialization libraries:
By default, Spark uses Java’s ObjectOutputStream framework.Advantages:
  • Works with any class implementing java.io.Serializable
  • No configuration required
  • Flexible
Disadvantages:
  • Often quite slow
  • Produces large serialized formats
  • Not recommended for production

Enabling Kryo Serialization

Switch to Kryo by configuring your SparkConf:
val conf = new SparkConf()
  .setMaster("spark://host:port")
  .setAppName("MyApp")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
Kryo is not the default because it requires custom registration, but we recommend it for any network-intensive application.

Registering Classes with Kryo

For best performance, register your custom classes with Kryo:
val conf = new SparkConf()
  .setMaster("spark://host:port")
  .setAppName("MyApp")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Spark automatically includes Kryo serializers for many commonly-used core Scala classes from the Twitter chill library.

Kryo Configuration

If your objects are large, increase the buffer size:
spark.kryoserializer.buffer=64m
This value needs to be large enough to hold the largest object you will serialize.

Memory Tuning

There are three considerations in tuning memory usage: the amount of memory used by your objects, the cost of accessing those objects, and the overhead of garbage collection.

Memory Overhead in Java Objects

Java objects can consume 2-5x more space than the raw data due to:
1

Object Headers

Each Java object has about 16 bytes of overhead containing information like a pointer to its class.
2

String Overhead

Java Strings have about 40 bytes of overhead and store each character as two bytes. A 10-character string can consume 60 bytes.
3

Collection Overhead

Collections like HashMap and LinkedList use wrapper objects for each entry, adding headers and pointers (typically 8 bytes each).
4

Boxed Primitives

Collections of primitive types store them as boxed objects like java.lang.Integer.

Memory Management Overview

Memory usage in Spark falls under two categories:
  • Execution memory: Used for computation in shuffles, joins, sorts, and aggregations
  • Storage memory: Used for caching and propagating internal data across the cluster
Execution and storage share a unified region (M). When no execution memory is used, storage can acquire all available memory and vice versa.
This design ensures applications that don’t use caching can use the entire space for execution, avoiding unnecessary disk spills.

Key Memory Configuration

PropertyDefaultDescription
spark.memory.fraction0.6Size of M as a fraction of (JVM heap space - 300MiB). The rest (40%) is reserved for user data structures and safeguarding against OOM errors.
spark.memory.storageFraction0.5Size of R as a fraction of M. R is the storage space where cached blocks are immune to being evicted by execution.
The typical user should not need to adjust these values as defaults work well for most workloads.

Determining Memory Consumption

The best way to size memory consumption for a dataset:
  1. Create an RDD and put it into cache
  2. Look at the “Storage” page in the web UI at http://<driver>:4040
  3. The page shows how much memory the RDD occupies
You can also use SizeEstimator.estimate() for specific objects:
import org.apache.spark.util.SizeEstimator

val myObject = MyClass(...)
val estimatedSize = SizeEstimator.estimate(myObject)
println(s"Estimated size: $estimatedSize bytes")

Tuning Data Structures

Reduce memory consumption by avoiding Java features that add overhead:
Design data structures to prefer arrays of objects and primitive types instead of standard Java or Scala collection classes like HashMap.Consider using the fastutil library for primitive type collections compatible with Java standard library.
Avoid nested structures with many small objects and pointers when possible. Each object adds overhead.
Consider using numeric IDs or enumeration objects instead of strings for keys. Strings have significant overhead.
If you have less than 32 GiB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight.
spark.executor.extraJavaOptions=-XX:+UseCompressedOops

Serialized RDD Storage

When objects are still too large despite tuning, store them in serialized form:
import org.apache.spark.storage.StorageLevel

val rdd = sc.textFile("hdfs://path/to/file")
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
Spark will store each RDD partition as one large byte array.
The downside of serialized storage is slower access times due to deserializing each object on the fly.
We highly recommend using Kryo if you want to cache data in serialized form, as it leads to much smaller sizes than Java serialization.

Garbage Collection Tuning

JVM garbage collection can be a problem when you have large “churn” in terms of the RDDs stored by your program.

When GC Becomes an Issue

GC is usually not a problem in programs that:
  • Read an RDD once
  • Run many operations on it
GC becomes an issue when:
  • Storing many RDDs
  • Creating many short-lived objects
  • Having high turnover of cached data
The cost of garbage collection is proportional to the number of Java objects. Using data structures with fewer objects (e.g., an array of Ints instead of a LinkedList) greatly lowers this cost.

Measuring GC Impact

Add these flags to Java options to collect GC statistics:
spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
Next time your job runs, you’ll see messages in the worker logs each time garbage collection occurs.
These logs appear in the stdout files in the workers’ work directories, not in your driver program.

Understanding JVM Memory Management

1

Generational Regions

Java heap space is divided into Young and Old generations. Young holds short-lived objects, Old holds objects with longer lifetimes.
2

Young Generation Structure

The Young generation has three regions: Eden, Survivor1, and Survivor2.
3

GC Process

When Eden is full, a minor GC runs. Live objects from Eden and Survivor1 copy to Survivor2. Old objects or when Survivor2 is full move to Old generation. When Old is nearly full, a full GC is invoked.

GC Tuning Strategy

The goal is to ensure only long-lived RDDs are stored in the Old generation and that the Young generation is sufficiently sized for short-lived objects.
If a full GC is invoked multiple times before a task completes, there isn’t enough memory available for executing tasks.Solution: Increase executor memory or reduce cached data.
If there are too many minor collections but not many major GCs, allocating more memory for Eden would help.Formula: If Eden size is determined to be E, set Young generation using:
spark.executor.extraJavaOptions=-Xmn=<4/3*E>
The 4/3 scaling accounts for space used by survivor regions.
If OldGen is close to being full, reduce memory used for caching:
spark.memory.fraction=0.4
It’s better to cache fewer objects than to slow down task execution.
Since Spark 4.0.0 uses JDK 17 by default, G1GC is the default garbage collector. With large executor heap sizes, increase the G1 region size:
spark.executor.extraJavaOptions=-XX:G1HeapRegionSize=32m

Practical GC Tuning Example

If your task reads data from HDFS:
  1. Estimate memory used by the task using the HDFS block size
  2. Decompressed block size is often 2-3x the block size
  3. For 3-4 tasks’ worth of working space with 128 MiB HDFS blocks:
    Eden size = 4 * 3 * 128 MiB = 1536 MiB
    
  4. Set Young generation size:
    spark.executor.extraJavaOptions=-Xmn=2048m
    

Other Performance Considerations

Level of Parallelism

Clusters will not be fully utilized unless you set the level of parallelism high enough.
// Specify parallelism explicitly
val data = sc.textFile("hdfs://path/to/file", numPartitions = 100)
val result = data.reduceByKey(_ + _, numPartitions = 100)
Alternatively, set the default:
spark.default.parallelism=200
We recommend 2-3 tasks per CPU core in your cluster.

Parallel Listing on Input Paths

For jobs with large numbers of input directories, increase directory listing parallelism:
# For Hadoop input formats
spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads=10

# For Spark SQL file-based sources
spark.sql.sources.parallelPartitionDiscovery.threshold=32
spark.sql.sources.parallelPartitionDiscovery.parallelism=10

Memory Usage of Reduce Tasks

If you get OutOfMemoryError in reduce tasks:
1

Increase Parallelism

Increase the level of parallelism so each task’s input set is smaller:
val result = data.groupByKey(numPartitions = 200)
2

Leverage Task Reuse

Spark can efficiently support tasks as short as 200ms because it reuses one executor JVM across many tasks.

Broadcasting Large Variables

Use broadcast variables to reduce the size of each serialized task:
val broadcastVar = sc.broadcast(Array(1, 2, 3))

val result = data.map { x =>
  // Use broadcastVar.value to access the broadcast value
  x * broadcastVar.value.sum
}
If your tasks use any large object from the driver program (e.g., a static lookup table), consider broadcasting it. Tasks larger than about 20 KiB are worth optimizing.

Data Locality

Data locality can have a major impact on performance. If data and code are together, computation tends to be fast. Locality Levels (from closest to farthest):
  1. PROCESS_LOCAL - Data is in the same JVM as the running code (best)
  2. NODE_LOCAL - Data is on the same node (e.g., in HDFS or another executor)
  3. NO_PREF - Data is accessed equally quickly from anywhere
  4. RACK_LOCAL - Data is on the same rack of servers
  5. ANY - Data is elsewhere on the network (worst)
Spark prefers to schedule tasks at the best locality level, but this isn’t always possible. Configure Locality Timeouts:
spark.locality.wait=3s
spark.locality.wait.node=3s
spark.locality.wait.process=3s
spark.locality.wait.rack=3s
Increase these settings if your tasks are long and see poor locality, but the default usually works well.

Performance Tuning Checklist

1

Enable Kryo Serialization

Switch from Java serialization to Kryo for better performance.
2

Persist Data Appropriately

Use serialized storage levels for large datasets that don’t fit in memory.
3

Tune Memory Fractions

Adjust spark.memory.fraction based on your caching needs.
4

Configure GC

Monitor GC and tune Young generation size if needed.
5

Set Parallelism

Ensure you have 2-3 tasks per CPU core.
6

Use Broadcast Variables

Broadcast large read-only data structures used in tasks.
7

Monitor Data Locality

Check the Spark UI for locality levels and adjust timeouts if needed.

Next Steps

Hardware Provisioning

Learn how to provision hardware for optimal Spark performance

Spark Configuration

Explore detailed configuration options

Build docs developers (and LLMs) love