Skip to main content
This guide covers migration for Spark MLlib, including machine learning algorithms, pipelines, and model persistence. Many SQL migration items also apply to DataFrame-based ML APIs.
For DataFrame-based MLlib APIs, review both this guide and the SQL Migration Guide since MLlib builds on Spark SQL.

Upgrading from MLlib 3.5 to 4.0

Breaking Changes

There are no breaking changes in MLlib 4.0.

PMML Schema Version Upgrade

The PMML XML schema version for exported models has been upgraded from PMML-4_3 to PMML-4_4.
Models exported in Spark 4.0 may not be compatible with systems expecting PMML 4.3 format.
Migration:
from pyspark.ml.classification import LogisticRegressionModel

# Export model in Spark 4.0
model = LogisticRegressionModel.load("path/to/model")
model.write().save("path/to/export")  # Uses PMML-4_4

# Ensure consuming systems support PMML 4.4

Upgrading from MLlib 2.4 to 3.0

Breaking Changes

OneHotEncoder Renamed

OneHotEncoderEstimator has been renamed to OneHotEncoder. The deprecated OneHotEncoder (non-estimator version) has been removed. Before (Spark 2.4):
from pyspark.ml.feature import OneHotEncoderEstimator

# Use OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=["category"], outputCols=["categoryVec"])
model = encoder.fit(df)
After (Spark 3.0):
from pyspark.ml.feature import OneHotEncoder

# OneHotEncoderEstimator renamed to OneHotEncoder
encoder = OneHotEncoder(inputCols=["category"], outputCols=["categoryVec"])
model = encoder.fit(df)

Image Schema Reading

ImageSchema.readImages has been removed in favor of the built-in image data source. Before (Spark 2.4):
from pyspark.ml.image import ImageSchema

df = ImageSchema.readImages("path/to/images")
After (Spark 3.0):
# Use built-in image data source
df = spark.read.format("image").load("path/to/images")

Removed Deprecated Algorithms

Several deprecated SGD-based algorithms have been removed:
  • LogisticRegressionWithSGD → Use LogisticRegression or LogisticRegressionWithLBFGS
  • LinearRegressionWithSGD → Use LinearRegression
  • RidgeRegressionWithSGD → Use LinearRegression with elasticNetParam=0.0
  • LassoWithSGD → Use LinearRegression with elasticNetParam=1.0
Migration Example:
# Before (Spark 2.4) - Using deprecated API
from pyspark.mllib.classification import LogisticRegressionWithSGD

model = LogisticRegressionWithSGD.train(rdd, iterations=100)

# After (Spark 3.0) - Using new API
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100)
model = lr.fit(df)

KMeans Runs Parameter Removed

The runs parameter in KMeans.train() has been removed as it has had no effect since Spark 2.0.
# Before (Spark 2.4)
from pyspark.mllib.clustering import KMeans
model = KMeans.train(rdd, k=2, runs=10)  # runs parameter ignored

# After (Spark 3.0)
model = KMeans.train(rdd, k=2)  # runs parameter removed

Behavior Changes

StringIndexer Ordering

When using frequencyDesc or frequencyAsc ordering, strings with equal frequency are now sorted alphabetically for consistency. Before (Spark 2.4):
from pyspark.ml.feature import StringIndexer

# Equal frequency strings have undefined order
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex", 
                        stringOrderType="frequencyDesc")
# ["a", "b", "c"] with equal frequency might be indexed as [0, 1, 2] or [2, 1, 0]
After (Spark 3.0):
# Equal frequency strings are sorted alphabetically
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex",
                        stringOrderType="frequencyDesc")
# ["a", "b", "c"] with equal frequency indexed as [0, 1, 2] (alphabetically)
StringIndexer now supports encoding multiple columns simultaneously.

Imputer Numeric Type Support

Imputer now handles all numeric types, not just Double and Float. Before (Spark 2.4):
from pyspark.ml.feature import Imputer

# Only works with Double/Float columns
imputer = Imputer(inputCols=["double_col"], outputCols=["double_out"])

# Integer columns require casting
df_cast = df.withColumn("int_col", col("int_col").cast("double"))
After (Spark 3.0):
# Works with all numeric types
imputer = Imputer(inputCols=["int_col", "double_col"], 
                  outputCols=["int_out", "double_out"])
model = imputer.fit(df)  # No casting needed

HashingTF Hash Function

HashingTF now uses a corrected implementation of the MurmurHash3 function.
The new hash function maps elements to different vector positions than in Spark 2.x. Models trained in Spark 2.x and loaded in Spark 3.0 will still use the old hash function.
Migration:
from pyspark.ml.feature import HashingTF

# Spark 3.0 uses corrected hash function
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=10000)

# If you need consistent results:
# 1. Retrain models with Spark 3.0
# 2. Or load models trained in Spark 2.x (will use old hash function)
model = HashingTF.load("spark2_model")  # Uses old hash function

MultilayerPerceptronClassificationModel Changes

The model now exposes training parameters through MultilayerPerceptronParams. Before (Spark 2.4):
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

model = MultilayerPerceptronClassificationModel.load("path")
layers = model.layers  # Direct array access
After (Spark 3.0):
# Use getLayers() method
model = MultilayerPerceptronClassificationModel.load("path")
layers = model.getLayers()  # Method call instead of direct access

ALS Error Handling

ALS now throws clearer exceptions when fitting on nondeterministic input data. Before (Spark 2.4):
# ArrayIndexOutOfBoundsException on rerun with nondeterministic input
als = ALS(rank=10)
model = als.fit(df)  # May fail with unclear error
After (Spark 3.0):
# Clear SparkException with descriptive message
als = ALS(rank=10)
model = als.fit(df)  # Fails with: "Detected mismatch between In/Out blocks"
Ensure your ALS input data is deterministic by caching it before training.
# Best practice: Cache input data
df_cached = df.cache()
als = ALS(rank=10)
model = als.fit(df_cached)

Upgrading from MLlib 2.2 to 2.3

Logistic Regression Summary Changes

The class hierarchy for logistic regression summaries has changed. Before (Spark 2.2):
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()
model = lr.fit(df)

# Cast to binary summary
summary = model.summary
binary_summary = summary  # Cast directly
After (Spark 2.3):
# Use binarySummary method for binary classification
model = lr.fit(df)
binary_summary = model.summary.binarySummary if model.numClasses == 2 else None

# Multinomial summary available directly
multi_summary = model.summary

OneHotEncoder Deprecation

The original OneHotEncoder has been deprecated in favor of OneHotEncoderEstimator (later renamed to OneHotEncoder in Spark 3.0).
# Spark 2.3: Use OneHotEncoderEstimator
from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(
    inputCols=["categoryIndex1", "categoryIndex2"],
    outputCols=["categoryVec1", "categoryVec2"]
)
model = encoder.fit(df)

OneVsRest Parallelism

The default parallelism in OneVsRest changed from using the default thread pool to serial execution (parallelism=1).
from pyspark.ml.classification import OneVsRest, LogisticRegression

# Spark 2.3+: Explicitly set parallelism for parallel training
ovr = OneVsRest(
    classifier=LogisticRegression(),
    parallelism=4  # Set explicitly for parallel execution
)

Best Practices

Model Versioning

Maintain clear versioning for your ML models:
from datetime import datetime
from pyspark.ml.classification import LogisticRegression

# Include Spark version and timestamp in model path
spark_version = spark.version
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_path = f"models/lr_spark{spark_version}_{timestamp}"

lr = LogisticRegression()
model = lr.fit(train_df)
model.write().overwrite().save(model_path)

# Save metadata
metadata = {
    "spark_version": spark_version,
    "model_type": "LogisticRegression",
    "training_date": timestamp,
    "features": train_df.columns
}

Testing Across Versions

Create comprehensive tests for version migration:
import pytest
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

def test_model_compatibility():
    """Test model works after version upgrade"""
    # Load model trained in previous version
    old_model = LogisticRegression.load("models/lr_spark2.4")
    
    # Test predictions still work
    predictions = old_model.transform(test_df)
    
    # Verify prediction column exists
    assert "prediction" in predictions.columns
    
    # Check metrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(predictions)
    
    # Ensure metrics are reasonable
    assert auc > 0.7, f"AUC dropped significantly: {auc}"

Gradual Algorithm Migration

When moving from deprecated to new algorithms:
# Step 1: Run both algorithms side-by-side
from pyspark.ml.classification import LogisticRegression

# New algorithm
new_lr = LogisticRegression(maxIter=100, regParam=0.01)
new_model = new_lr.fit(train_df)
new_predictions = new_model.transform(test_df)

# Step 2: Compare metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
new_accuracy = evaluator.evaluate(new_predictions)

print(f"New model accuracy: {new_accuracy}")

# Step 3: Use new model if metrics are acceptable
if new_accuracy >= 0.8:
    new_model.write().overwrite().save("production_model")

Pipeline Compatibility

Ensure entire pipelines are compatible after upgrades:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Create pipeline with version-compatible components
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCols=["categoryIndex"], outputCols=["categoryVec"])
assembler = VectorAssembler(
    inputCols=["categoryVec", "feature1", "feature2"],
    outputCol="features"
)
rf = RandomForestClassifier(featuresCol="features", labelCol="label")

# Build and save pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler, rf])
model = pipeline.fit(train_df)
model.write().overwrite().save("pipeline_model")

# Test loaded pipeline
loaded_model = Pipeline.load("pipeline_model")
predictions = loaded_model.transform(test_df)

See Also

Build docs developers (and LLMs) love