Caching Data
Spark SQL can cache tables using an in-memory columnar format, automatically tuning compression to minimize memory usage and GC pressure:Caching Configuration
When true, Spark SQL automatically selects a compression codec for each column based on statistics of the data.
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
Tuning Partitions
Proper partitioning is crucial for performance. Configure partition settings based on your data size and cluster resources.File Partition Settings
The maximum number of bytes to pack into a single partition when reading files. Effective for Parquet, JSON, and ORC.
The estimated cost to open a file, used when putting multiple files into a partition. Better to over-estimate.
Configures the number of partitions to use when shuffling data for joins or aggregations.
Partition Hints
You can control partitioning using SQL hints:Leveraging Statistics
Spark’s ability to choose optimal execution plans depends on statistics:Types of Statistics
Data Source Statistics
Data Source Statistics
Statistics read directly from data sources (e.g., counts and min/max values in Parquet metadata). Maintained by the underlying data source.
Catalog Statistics
Catalog Statistics
Statistics read from the catalog (e.g., Hive Metastore). Collected or updated when you run
ANALYZE TABLE.Runtime Statistics
Runtime Statistics
Statistics computed by Spark during query execution as part of adaptive query execution.
Collecting Statistics
Inspecting Statistics
Optimizing Join Strategy
Broadcast Joins
For small tables, broadcast joins are highly efficient:Maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. Set to -1 to disable.
Timeout for the broadcast wait time in broadcast joins.
Join Hints
You can guide Spark’s join strategy selection:Join strategy hints are prioritized as: BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL
Adaptive Query Execution
Adaptive Query Execution (AQE) optimizes query plans based on runtime statistics. It’s enabled by default since Spark 3.2.0.Enable adaptive query execution, which re-optimizes the query plan based on runtime statistics.
Coalescing Post-Shuffle Partitions
AQE automatically coalesces shuffle partitions based on runtime statistics:When true, Spark coalesces shuffle partitions according to the target size to avoid too many small tasks.
The advisory size of shuffle partitions during adaptive optimization.
The minimum size of shuffle partitions after coalescing.
Converting Sort-Merge Join to Broadcast Join
AQE can convert sort-merge joins to broadcast joins at runtime:spark.sql.adaptive.autoBroadcastJoinThreshold
bytes
default:"same as spark.sql.autoBroadcastJoinThreshold"
Maximum size for a table to be broadcast in adaptive execution. Set to -1 to disable.
When true, Spark uses local shuffle reader to read shuffle data when partitioning is not needed.
Optimizing Skewed Joins
AQE dynamically handles data skew in sort-merge joins:When true, Spark dynamically handles skew in sort-merge join by splitting skewed partitions.
A partition is skewed if its size is larger than this factor multiplying the median partition size.
A partition is skewed if its size exceeds this threshold and the factor threshold.
Storage Partition Join
Storage Partition Join (SPJ) avoids shuffle by using existing storage layout:When true, try to eliminate shuffle by using the partitioning reported by a V2 data source.
When true, try to eliminate shuffle if one side of the join has missing partition values.
Example with Iceberg
Storage Partition Join requires V2 data sources that support partitioning, such as Iceberg or Delta Lake.
Best Practices
Cache Frequently Accessed Data
Use caching for data that’s accessed multiple times in iterative algorithms or interactive queries.
Collect Statistics
Run ANALYZE TABLE regularly to help Spark make informed optimization decisions.
Tune Shuffle Partitions
Adjust
spark.sql.shuffle.partitions based on your data size. Use AQE to avoid manual tuning.Use Broadcast Joins
For small tables, broadcast joins eliminate shuffle and significantly improve performance.
Enable AQE
Adaptive Query Execution provides automatic optimizations for most workloads.
Optimize File Format
Use columnar formats like Parquet or ORC for analytical workloads.
Next Steps
Distributed SQL Engine
Learn to use Spark as a distributed query engine
Data Sources
Optimize data source operations
