This guide covers migration for Spark Core, including RDD APIs, scheduling, storage, and deployment configurations. These changes affect all Spark applications regardless of the higher-level APIs used.
Upgrading from Core 4.1 to 4.2
Kubernetes Executor Allocation Batch Size
Spark 4.2 increases the Kubernetes executor allocation batch size from 10 to 20 for better scaling performance.
Migration:
# Restore previous batch size if needed
--conf spark.kubernetes.allocation.batch.size=10
Network Policy for Kubernetes
Spark 4.2 configures a NetworkPolicy by default to restrict executor pod traffic to only the driver and peer executors.
This may break applications that require executors to communicate with external services. Ensure your network policies allow required traffic.
Migration:
# Disable network policy enforcement
--conf spark.kubernetes.driver.pod.excludedFeatureSteps=org.apache.spark.deploy.k8s.features.NetworkPolicyFeatureStep
Upgrading from Core 4.0 to 4.1
Master REST API Enabled by Default
The Spark Master daemon now provides a REST API by default for programmatic cluster management.
Migration:
# Disable REST API if not needed
--conf spark.master.rest.enabled=false
RDD Checkpoint Compression
Spark 4.1 compresses RDD checkpoints by default to reduce storage overhead.
Before (Spark 4.0):
# Checkpoints stored uncompressed
sc.setCheckpointDir("hdfs://checkpoint")
rdd.checkpoint()
After (Spark 4.1):
# Checkpoints compressed by default
sc.setCheckpointDir("hdfs://checkpoint")
rdd.checkpoint() # Automatically compressed
# Disable compression if needed
spark.conf.set("spark.checkpoint.compress", "false")
S3 Magic Committer
Apache Hadoop Magic Committer is now used for all S3 buckets by default, providing better performance and reliability.
The Magic Committer requires Hadoop 3.3.1+ and provides atomic, high-performance commits to S3.
Migration:
# Spark 4.1: Magic Committer enabled by default
# No action needed for most applications
# Restore previous behavior if needed
spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
Ignore Corrupt Files Behavior
java.lang.InternalError during file reading no longer fails tasks when ignoreCorruptFiles is enabled.
# Spark 4.1: InternalError is ignored
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
df = spark.read.parquet("s3://bucket/data") # Skips files with InternalError
LZF Compression Parallelization
LZF compression now uses multiple threads for parallel compression.
# Disable parallel LZF if needed
spark.conf.set("spark.io.compression.lzf.parallel.enabled", "false")
Native Netty IO Mode
Spark uses native Netty IO mode by default for better network performance.
# Restore NIO mode if needed
spark.conf.set("spark.io.mode.default", "NIO")
Upgrading from Core 3.5 to 4.0
Servlet API Migration (javax to jakarta)
Spark 4.0 migrates from javax.servlet to jakarta.servlet namespace.
If your application uses servlet APIs directly or depends on libraries that use servlets, ensure they support Jakarta EE 9+.
Migration:
// Before (Spark 3.5)
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
// After (Spark 4.0)
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
Event Log Rolling and Compression
Spark 4.0 enables event log rolling and compression by default.
Benefits:
- Reduced storage requirements
- Incremental archiving for long-running applications
- Better History Server performance
Migration:
# Disable event log rolling
--conf spark.eventLog.rolling.enabled=false
# Disable event log compression
--conf spark.eventLog.compress=false
Worker Cleanup
Spark workers now periodically clean up worker and stopped application directories.
# Configure cleanup interval (default: 1800 seconds)
--conf spark.worker.cleanup.interval=3600
# Disable cleanup if needed
--conf spark.worker.cleanup.enabled=false
Shuffle Service Storage Backend
The default shuffle service storage backend changed from LevelDB to RocksDB.
RocksDB provides better performance and stability for large-scale shuffle operations.
Migration:
# Restore LevelDB backend
--conf spark.shuffle.service.db.backend=LEVELDB
Mesos Support Removed
Apache Mesos support has been completely removed in Spark 4.0. Migrate to YARN, Kubernetes, or Standalone mode.
Migration Paths:
- Kubernetes (Recommended):
spark-submit \
--master k8s://https://kubernetes.example.com:443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=spark:4.0 \
--conf spark.executor.instances=10 \
my-app.jar
- YARN:
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
my-app.jar
- Standalone:
# Start master
./sbin/start-master.sh
# Start workers
./sbin/start-worker.sh spark://master:7077
# Submit application
spark-submit \
--master spark://master:7077 \
my-app.jar
Kubernetes Configuration Changes
Executor Batch Size
Default batch size increased from 5 to 10:
# Restore previous behavior
--conf spark.kubernetes.allocation.batch.size=5
Persistent Volume Claims
Access mode changed from ReadWriteOnce to ReadWriteOncePod:
# Spark 4.0 uses ReadWriteOncePod by default
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: spark-pvc
spec:
accessModes:
- ReadWriteOncePod # More restrictive
# Restore ReadWriteOnce if needed
--conf spark.kubernetes.legacy.useReadWriteOnceAccessMode=true
Executor Pod Status Check
Spark now checks all containers in executor pods:
# Restore legacy behavior (check only main container)
--conf spark.kubernetes.executor.checkAllContainers=false
Ivy Directory Change
Default Ivy user directory changed from ~/.ivy2 to ~/.ivy2.5.2.
Before (Spark 3.5):
# Dependencies downloaded to ~/.ivy2
After (Spark 4.0):
# Dependencies downloaded to ~/.ivy2.5.2
# Restore previous location
--conf spark.jars.ivy=~/.ivy2
External Shuffle Service Enhancements
External shuffle service now deletes shuffle blocks when executors are deallocated.
# Disable automatic shuffle deletion
--conf spark.shuffle.service.removeShuffle=false
MDC Logging Key Change
The MDC key for task names changed from mdc.taskName to task_name.
Before (Spark 3.5):
<!-- log4j2.xml -->
<Pattern>%d{yyyy-MM-dd HH:mm:ss} [%X{mdc.taskName}] %p %c - %m%n</Pattern>
After (Spark 4.0):
<!-- log4j2.xml -->
<Pattern>%d{yyyy-MM-dd HH:mm:ss} [%X{task_name}] %p %c - %m%n</Pattern>
<!-- Or restore legacy key -->
--conf spark.log.legacyTaskNameMdc.enabled=true
Speculative Execution Changes
Speculative execution is now less aggressive:
Before (Spark 3.5):
spark.speculation.multiplier=1.5
spark.speculation.quantile=0.75
After (Spark 4.0):
spark.speculation.multiplier=3
spark.speculation.quantile=0.9
Migration:
# Restore aggressive speculation
--conf spark.speculation.multiplier=1.5 \
--conf spark.speculation.quantile=0.75
File Reading Exception Handling
AccessControlException and BlockMissingException now always fail tasks, even with ignoreCorruptFiles enabled.
This prevents silent data loss from permission or block availability issues.
# These exceptions now always fail the task
try:
df = spark.read.parquet("protected_data")
except Exception as e:
if "AccessControlException" in str(e):
# Handle permission errors explicitly
raise
Upgrading from Core 3.4 to 3.5
Configuration Deprecations
YARN-specific configs are deprecated in favor of general configs:
# Deprecated
--conf spark.yarn.executor.failuresValidityInterval=1h
--conf spark.yarn.max.executor.failures=10
# Use instead
--conf spark.executor.failuresValidityInterval=1h
--conf spark.executor.maxNumFailures=10
Upgrading from Core 3.3 to 3.4
Dynamic Allocation Enhancements
Spark 3.4 enables shuffle tracking for dynamic allocation without external shuffle service.
# Spark 3.4: Shuffle tracking enabled by default
spark = SparkSession.builder \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
.getOrCreate()
# Disable shuffle tracking if needed
spark.conf.set("spark.dynamicAllocation.shuffleTracking.enabled", "false")
Storage Decommissioning
Spark now decommissions cached RDD and shuffle blocks when enabled.
# Enable full decommissioning
--conf spark.decommission.enabled=true \
--conf spark.storage.decommission.enabled=true \
--conf spark.storage.decommission.rddBlocks.enabled=true \
--conf spark.storage.decommission.shuffleBlocks.enabled=true
Upgrading from Core 3.2 to 3.3
Log4j 2 Migration
Spark 3.3 migrates from Log4j 1.x to Log4j 2.x.
Log4j 1.x reached end-of-life in August 2015. You must migrate your logging configuration to Log4j 2 format.
Before (log4j.properties):
# log4j.properties (Log4j 1.x)
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
After (log4j2.properties):
# log4j2.properties (Log4j 2.x)
rootLogger.level=INFO
rootLogger.appenderRef.stdout.ref=console
appender.console.type=Console
appender.console.name=console
appender.console.target=SYSTEM_OUT
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
Upgrading from Core 3.1 to 3.2
Proactive Storage Replication
Spark 3.2 enables proactive replication of cached RDD blocks by default.
# Automatically replenishes lost replicas
spark.conf.set("spark.storage.replication.proactive", "true") # Default in 3.2+
# Disable if needed
spark.conf.set("spark.storage.replication.proactive", "false")
Empty Split Handling
Empty input splits are ignored by default:
# Spark 3.2: No empty partitions created
rdd = sc.textFile("data.txt") # Empty splits ignored
# Restore previous behavior
spark.conf.set("spark.hadoopRDD.ignoreEmptySplits", "false")
Best Practices
Configuration Management
Maintain configuration profiles for different Spark versions:
# config_manager.py
from pyspark.sql import SparkSession
def get_spark_config(spark_version):
"""Return version-appropriate configuration"""
config = {}
if spark_version.startswith("4."):
config.update({
"spark.eventLog.rolling.enabled": "true",
"spark.eventLog.compress": "true",
"spark.shuffle.service.db.backend": "ROCKSDB"
})
elif spark_version.startsWith("3."):
config.update({
"spark.sql.adaptive.enabled": "true",
"spark.storage.replication.proactive": "true"
})
return config
# Use in application
builder = SparkSession.builder.appName("MyApp")
for key, value in get_spark_config(spark_version).items():
builder = builder.config(key, value)
spark = builder.getOrCreate()
Monitoring Version-Specific Behavior
import logging
logger = logging.getLogger(__name__)
# Log version and critical configs
logger.info(f"Spark version: {spark.version}")
logger.info(f"Event log compression: {spark.conf.get('spark.eventLog.compress')}")
logger.info(f"Shuffle backend: {spark.conf.get('spark.shuffle.service.db.backend')}")
See Also