Skip to main content
ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help you create and tune practical machine learning pipelines.

Overview

MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline or workflow. The pipeline concept is inspired by scikit-learn.

Main Concepts

DataFrame

Machine learning can be applied to a wide variety of data types. The Pipeline API uses DataFrames from Spark SQL to support various data types including:
  • Basic types (Double, String, Array, etc.)
  • Structured types from Spark SQL
  • ML Vector types for feature vectors
Columns in a DataFrame are named, such as “text”, “features”, “label”, and “prediction”.

Pipeline Components

Transformer

A Transformer is an algorithm that transforms one DataFrame into another DataFrame. It implements a transform() method. Examples:
  • A feature transformer reads a column (e.g., text), maps it to a new column (e.g., feature vectors), and outputs a new DataFrame
  • A learning model reads features, predicts labels, and outputs a DataFrame with predictions
from pyspark.ml.feature import Tokenizer

# Tokenizer is a Transformer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsDF = tokenizer.transform(sentenceDF)

Estimator

An Estimator is an algorithm that can be fit on a DataFrame to produce a Transformer. It implements a fit() method. Example:
  • LogisticRegression is an Estimator that trains on data
  • Calling fit() produces a LogisticRegressionModel (a Transformer)
from pyspark.ml.classification import LogisticRegression

# LogisticRegression is an Estimator
lr = LogisticRegression()

# Fit returns a Transformer (LogisticRegressionModel)
model = lr.fit(trainingDF)

# Model can transform data
predictions = model.transform(testDF)

Pipeline

A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow. A Pipeline is itself an Estimator that produces a PipelineModel (a Transformer) when fit.
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(trainingDF)

Parameter

All Transformers and Estimators share a common API for specifying parameters:
# Set parameters directly
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Or use setter methods
lr.setMaxIter(10)
lr.setRegParam(0.01)

How Pipelines Work

Training Time

When you call pipeline.fit() on training data:
  1. Each Transformer’s transform() method is called on the DataFrame
  2. Each Estimator’s fit() method is called, producing a Transformer
  3. The Transformers from Estimators are used in the resulting PipelineModel
ML Pipeline Training
Example workflow:
Raw Data → Tokenizer → HashingTF → LogisticRegression.fit() → PipelineModel
   ↓           ↓            ↓                ↓
DataFrame → words → rawFeatures → LogisticRegressionModel

Prediction Time

When you call model.transform() on test data:
  1. Data flows through the fitted pipeline in order
  2. Each stage’s transform() method is called
  3. Final predictions are produced
ML Pipeline Prediction
Pipelines ensure that training and test data go through identical feature processing steps, preventing training/serving skew.

Complete Pipeline Example

Here’s a complete example that demonstrates building and using a Pipeline:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure ML pipeline with three stages
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents
model = pipeline.fit(training)

# Prepare test documents
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(f"({rid}, {text}) --> prob={prob}, prediction={prediction}")

Pipeline Parameters

You can specify parameters for specific stages in a Pipeline:
from pyspark.ml.tuning import ParamGridBuilder

# Create parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()
Parameters belong to specific instances of Estimators and Transformers. This allows you to have multiple instances with different parameters in the same Pipeline.

DAG Pipelines

While most examples show linear Pipelines, you can create non-linear Pipelines as long as the data flow forms a Directed Acyclic Graph (DAG). The graph is specified implicitly based on input and output column names.
If your Pipeline forms a DAG, stages must be specified in topological order.

Runtime Checking

Since Pipelines operate on DataFrames with varied types, they use runtime checking instead of compile-time type checking. Type checking is done using the DataFrame schema before running the Pipeline.

Pipeline Persistence

You can save and load Pipelines and PipelineModels:
# Save pipeline model
model.save("path/to/pipeline-model")

# Load pipeline model
from pyspark.ml import PipelineModel
loadedModel = PipelineModel.load("path/to/pipeline-model")

# Use loaded model
predictions = loadedModel.transform(testData)
ML persistence works across Scala, Java, and Python, but R uses a modified format. Models saved in R can only be loaded back in R.

Backwards Compatibility

MLlib maintains backwards compatibility for ML persistence:
  • Minor/Patch versions: Full backwards compatibility - models saved in version X can be loaded in version Y
  • Major versions: Best-effort compatibility, but no guarantees
Model behavior:
  • Minor/patch versions: Identical behavior (except bug fixes)
  • Major versions: Best-effort, but no guarantees

Best Practices

Each Pipeline stage should be a unique instance. Don’t insert the same instance (e.g., myHashingTF) twice, as stages must have unique IDs.
Use Pipelines to ensure training and test data undergo identical feature transformations, preventing subtle bugs.
Break complex ML workflows into clear stages. This makes your code more maintainable and easier to debug.
Save the entire PipelineModel rather than individual stages. This ensures you can reproduce predictions exactly.

Next Steps

Model Tuning

Learn how to tune hyperparameters with cross-validation

Classification & Regression

Explore ML algorithms for supervised learning

Feature Engineering

Transform and prepare data with feature transformers

Estimator & Transformer

Deep dive into the core abstractions

Build docs developers (and LLMs) love