Skip to main content
Spark offers many techniques for tuning the performance of DataFrame and SQL workloads. These techniques include caching data, altering dataset partitioning, selecting optimal join strategies, and providing the optimizer with additional information.

Caching Data

Spark SQL can cache tables using an in-memory columnar format, automatically tuning compression to minimize memory usage and GC pressure:
# Cache a DataFrame
df.cache()

# Or use the catalog
spark.catalog.cacheTable("tableName")

# Uncache when done
df.unpersist()
spark.catalog.uncacheTable("tableName")

Caching Configuration

spark.sql.inMemoryColumnarStorage.compressed
boolean
default:"true"
When true, Spark SQL automatically selects a compression codec for each column based on statistics of the data.
spark.sql.inMemoryColumnarStorage.batchSize
int
default:"10000"
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
Caching is most effective when you’ll access the same data multiple times. Use it for iterative algorithms or interactive queries.

Tuning Partitions

Proper partitioning is crucial for performance. Configure partition settings based on your data size and cluster resources.

File Partition Settings

spark.sql.files.maxPartitionBytes
bytes
default:"134217728 (128 MB)"
The maximum number of bytes to pack into a single partition when reading files. Effective for Parquet, JSON, and ORC.
spark.sql.files.openCostInBytes
bytes
default:"4194304 (4 MB)"
The estimated cost to open a file, used when putting multiple files into a partition. Better to over-estimate.
spark.sql.shuffle.partitions
int
default:"200"
Configures the number of partitions to use when shuffling data for joins or aggregations.

Partition Hints

You can control partitioning using SQL hints:
-- Reduce to 3 partitions
SELECT /*+ COALESCE(3) */ * FROM large_table;

-- Repartition by column
SELECT /*+ REPARTITION(dept_id) */ * FROM employees;

-- Repartition with specific count
SELECT /*+ REPARTITION(10, dept_id) */ * FROM employees;

-- Repartition by range
SELECT /*+ REPARTITION_BY_RANGE(salary) */ * FROM employees;

-- Rebalance partitions
SELECT /*+ REBALANCE(dept_id) */ * FROM employees;
# Using DataFrame API
df.coalesce(3)  # Reduce partitions
df.repartition(10, "dept_id")  # Increase partitions
df.repartitionByRange("salary")  # Range partitioning

Leveraging Statistics

Spark’s ability to choose optimal execution plans depends on statistics:

Types of Statistics

Statistics read directly from data sources (e.g., counts and min/max values in Parquet metadata). Maintained by the underlying data source.
Statistics read from the catalog (e.g., Hive Metastore). Collected or updated when you run ANALYZE TABLE.
Statistics computed by Spark during query execution as part of adaptive query execution.

Collecting Statistics

-- Collect table statistics
ANALYZE TABLE table_name COMPUTE STATISTICS;

-- Collect column statistics
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS col1, col2;

-- View statistics
DESCRIBE EXTENDED table_name;

Inspecting Statistics

-- View query plan with cost estimates
EXPLAIN COST SELECT * FROM table_name WHERE col1 > 100;
# View query plan with cost estimates
df.explain(mode="cost")
Missing or inaccurate statistics hinder Spark’s ability to select optimal plans and may lead to poor query performance.

Optimizing Join Strategy

Broadcast Joins

For small tables, broadcast joins are highly efficient:
spark.sql.autoBroadcastJoinThreshold
bytes
default:"10485760 (10 MB)"
Maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. Set to -1 to disable.
spark.sql.broadcastTimeout
seconds
default:"300"
Timeout for the broadcast wait time in broadcast joins.

Join Hints

You can guide Spark’s join strategy selection:
-- Broadcast join hint
SELECT /*+ BROADCAST(small_table) */ *
FROM large_table
JOIN small_table ON large_table.id = small_table.id;

-- Merge join hint
SELECT /*+ MERGE(table1) */ *
FROM table1 JOIN table2 ON table1.id = table2.id;

-- Shuffle hash join hint
SELECT /*+ SHUFFLE_HASH(table1) */ *
FROM table1 JOIN table2 ON table1.id = table2.id;
# Broadcast join using hint
from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), "key").show()
Join strategy hints are prioritized as: BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL

Adaptive Query Execution

Adaptive Query Execution (AQE) optimizes query plans based on runtime statistics. It’s enabled by default since Spark 3.2.0.
spark.sql.adaptive.enabled
boolean
default:"true"
Enable adaptive query execution, which re-optimizes the query plan based on runtime statistics.

Coalescing Post-Shuffle Partitions

AQE automatically coalesces shuffle partitions based on runtime statistics:
spark.sql.adaptive.coalescePartitions.enabled
boolean
default:"true"
When true, Spark coalesces shuffle partitions according to the target size to avoid too many small tasks.
spark.sql.adaptive.advisoryPartitionSizeInBytes
bytes
default:"64 MB"
The advisory size of shuffle partitions during adaptive optimization.
spark.sql.adaptive.coalescePartitions.minPartitionSize
bytes
default:"1 MB"
The minimum size of shuffle partitions after coalescing.
# Set advisory partition size
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

Converting Sort-Merge Join to Broadcast Join

AQE can convert sort-merge joins to broadcast joins at runtime:
spark.sql.adaptive.autoBroadcastJoinThreshold
bytes
default:"same as spark.sql.autoBroadcastJoinThreshold"
Maximum size for a table to be broadcast in adaptive execution. Set to -1 to disable.
spark.sql.adaptive.localShuffleReader.enabled
boolean
default:"true"
When true, Spark uses local shuffle reader to read shuffle data when partitioning is not needed.

Optimizing Skewed Joins

AQE dynamically handles data skew in sort-merge joins:
spark.sql.adaptive.skewJoin.enabled
boolean
default:"true"
When true, Spark dynamically handles skew in sort-merge join by splitting skewed partitions.
spark.sql.adaptive.skewJoin.skewedPartitionFactor
double
default:"5.0"
A partition is skewed if its size is larger than this factor multiplying the median partition size.
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
bytes
default:"256 MB"
A partition is skewed if its size exceeds this threshold and the factor threshold.
AQE is particularly effective for:
  • Queries with multiple stages
  • Queries with joins on skewed data
  • Queries where optimal partition count isn’t known upfront

Storage Partition Join

Storage Partition Join (SPJ) avoids shuffle by using existing storage layout:
spark.sql.sources.v2.bucketing.enabled
boolean
default:"true"
When true, try to eliminate shuffle by using the partitioning reported by a V2 data source.
spark.sql.sources.v2.bucketing.pushPartValues.enabled
boolean
default:"true"
When true, try to eliminate shuffle if one side of the join has missing partition values.

Example with Iceberg

-- Create partitioned tables
CREATE TABLE prod.db.target (id INT, salary INT, dep STRING)
USING iceberg
PARTITIONED BY (dep, bucket(8, id));

CREATE TABLE prod.db.source (id INT, salary INT, dep STRING)
USING iceberg
PARTITIONED BY (dep, bucket(8, id));

-- Enable Storage Partition Join
SET spark.sql.sources.v2.bucketing.enabled=true;
SET spark.sql.iceberg.planning.preserve-data-grouping=true;
SET spark.sql.sources.v2.bucketing.pushPartValues.enabled=true;
SET spark.sql.requireAllClusterKeysForCoPartition=false;

-- This join will not have Exchange nodes (no shuffle)
SELECT * FROM target t 
INNER JOIN source s ON t.dep = s.dep AND t.id = s.id;
Storage Partition Join requires V2 data sources that support partitioning, such as Iceberg or Delta Lake.

Best Practices

Cache Frequently Accessed Data

Use caching for data that’s accessed multiple times in iterative algorithms or interactive queries.

Collect Statistics

Run ANALYZE TABLE regularly to help Spark make informed optimization decisions.

Tune Shuffle Partitions

Adjust spark.sql.shuffle.partitions based on your data size. Use AQE to avoid manual tuning.

Use Broadcast Joins

For small tables, broadcast joins eliminate shuffle and significantly improve performance.

Enable AQE

Adaptive Query Execution provides automatic optimizations for most workloads.

Optimize File Format

Use columnar formats like Parquet or ORC for analytical workloads.

Next Steps

Distributed SQL Engine

Learn to use Spark as a distributed query engine

Data Sources

Optimize data source operations

Build docs developers (and LLMs) love