Skip to main content
This page covers algorithms for classification and regression, including linear methods, tree-based algorithms, and ensemble methods.

Classification

Classification algorithms predict categorical outcomes. Spark MLlib provides several classification algorithms:

Logistic Regression

Logistic regression is a popular method for predicting categorical responses. In spark.ml, you can use it for binary or multiclass classification.

Binary Classification

For binary classification, logistic regression predicts the probability of outcomes:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Create logistic regression model with elastic net regularization
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print coefficients and intercept
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
The elasticNetParam corresponds to α and regParam corresponds to λ in the elastic net regularization formula.

Multinomial Classification

For multiclass problems, use multinomial logistic regression:
# Create multinomial logistic regression
mlr = LogisticRegression(
    maxIter=10, 
    regParam=0.3, 
    elasticNetParam=0.8, 
    family="multinomial"
)

# Fit the model
mlrModel = mlr.fit(training)

# Print coefficients matrix and intercepts vector
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Model Summary

Extract training summary statistics:
# Get training summary
trainingSummary = lrModel.summary

# Print metrics
print(f"Accuracy: {trainingSummary.accuracy}")
print(f"F-measure: {trainingSummary.fMeasureByLabel()}")
print(f"Precision: {trainingSummary.precisionByLabel}")
print(f"Recall: {trainingSummary.recallByLabel}")

# For binary classification, access ROC curve
if hasattr(trainingSummary, 'roc'):
    trainingSummary.roc.show()
    print(f"Area under ROC: {trainingSummary.areaUnderROC}")

Decision Tree Classifier

Decision trees are popular for their interpretability and ability to handle non-linear relationships:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Index categorical features
featureIndexer = VectorIndexer(
    inputCol="features", 
    outputCol="indexedFeatures", 
    maxCategories=4
).fit(data)

# Split data
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train Decision Tree
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
model = dt.fit(trainingData)

# Make predictions
predictions = model.transform(testData)
predictions.select("prediction", "indexedLabel", "features").show(5)

Random Forest Classifier

Random forests are ensemble methods that combine multiple decision trees:
from pyspark.ml.classification import RandomForestClassifier

# Train Random Forest
rf = RandomForestClassifier(
    labelCol="indexedLabel", 
    featuresCol="indexedFeatures", 
    numTrees=10
)
model = rf.fit(trainingData)

# Make predictions
predictions = model.transform(testData)

# Evaluate accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
print(f"Test Error = {1.0 - accuracy}")

Gradient-Boosted Tree Classifier

Gradient-boosted trees (GBTs) build an ensemble of trees sequentially:
from pyspark.ml.classification import GBTClassifier

# Train GBT model
gbt = GBTClassifier(
    labelCol="indexedLabel", 
    featuresCol="indexedFeatures", 
    maxIter=10
)
model = gbt.fit(trainingData)

# Make predictions
predictions = model.transform(testData)

# Evaluate
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

Multilayer Perceptron Classifier

Multilayer perceptron classifier (MLPC) is a neural network-based classifier:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_multiclass_classification_data.txt")

# Split data
train, test = data.randomSplit([0.6, 0.4], seed=1234)

# Specify layers: input layer of size 4, two intermediate layers, output layer of size 3
layers = [4, 5, 4, 3]

# Create trainer
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# Train model
model = trainer.fit(train)

# Compute accuracy
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Test set accuracy = {evaluator.evaluate(predictionAndLabels)}")

Linear Support Vector Machine

Linear SVM constructs a hyperplane for binary classification:
from pyspark.ml.classification import LinearSVC

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Create LinearSVC model
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
model = lsvc.fit(training)

# Print coefficients and intercept
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

Naive Bayes

Naive Bayes classifiers are simple probabilistic classifiers:
from pyspark.ml.classification import NaiveBayes

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Split data
train, test = data.randomSplit([0.6, 0.4], seed=1234)

# Create Naive Bayes model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test set accuracy = {accuracy}")

Regression

Regression algorithms predict continuous outcomes.

Linear Regression

Linear regression models the relationship between features and a continuous target:
from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

# Create linear regression model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")

# Summarize the model
trainingSummary = lrModel.summary
print(f"RMSE: {trainingSummary.rootMeanSquaredError}")
print(f"R2: {trainingSummary.r2}")

Generalized Linear Regression

Generalized linear models (GLMs) extend linear regression to different response distributions:
from pyspark.ml.regression import GeneralizedLinearRegression

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

# Create GLM with Gaussian family and identity link
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(data)

# Print coefficients and intercept
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

# Get summary
summary = model.summary
print(f"Coefficient Standard Errors: {summary.coefficientStandardErrors}")
print(f"T Values: {summary.tValues}")
print(f"P Values: {summary.pValues}")
Available families include Gaussian, Binomial, Poisson, Gamma, and Tweedie. Each family has a canonical link function.

Decision Tree Regression

from pyspark.ml.regression import DecisionTreeRegressor

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Split data
train, test = data.randomSplit([0.7, 0.3])

# Train Decision Tree
dt = DecisionTreeRegressor(featuresCol="features")
model = dt.fit(train)

# Make predictions
predictions = model.transform(test)
predictions.select("prediction", "label", "features").show(5)

# Evaluate
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

Random Forest Regression

from pyspark.ml.regression import RandomForestRegressor

# Train Random Forest
rf = RandomForestRegressor(featuresCol="features", numTrees=10)
model = rf.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

Gradient-Boosted Tree Regression

from pyspark.ml.regression import GBTRegressor

# Train GBT model
gbt = GBTRegressor(featuresCol="features", maxIter=10)
model = gbt.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

AFT Survival Regression

Accelerated Failure Time (AFT) model for survival analysis:
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors

# Create training data
training = spark.createDataFrame([
    (1.0, Vectors.dense(1.0, 2.0), 1.0),
    (0.5, Vectors.dense(1.5, 1.0), 0.0),
    (1.5, Vectors.dense(2.0, 3.0), 1.0),
], ["label", "features", "censor"])

# Create AFT model
aft = AFTSurvivalRegression()
model = aft.fit(training)

# Print coefficients and intercept
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")
print(f"Scale: {model.scale}")

Isotonic Regression

Isotonic regression fits a monotonically increasing function:
from pyspark.ml.regression import IsotonicRegression

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

# Create isotonic regression model
ir = IsotonicRegression()
model = ir.fit(data)

# Print boundaries and predictions
print(f"Boundaries: {model.boundaries}")
print(f"Predictions: {model.predictions}")

Factorization Machines

Factorization machines handle feature interactions for classification and regression:
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Scale features between 0 and 1
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(data)
scaledData = scalerModel.transform(data)

# Split data
train, test = scaledData.randomSplit([0.8, 0.2], seed=12345)

# Train FM classifier
fm = FMClassifier(featuresCol="scaledFeatures", stepSize=0.001)
model = fm.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test set accuracy = {accuracy}")
When using Factorization Machines, scale continuous features to [0, 1] or one-hot encode them to prevent exploding gradients.

Model Evaluation

Classification Metrics

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Multiclass metrics
multiEvaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = multiEvaluator.evaluate(predictions, {multiEvaluator.metricName: "accuracy"})
f1 = multiEvaluator.evaluate(predictions, {multiEvaluator.metricName: "f1"})

print(f"Accuracy = {accuracy}")
print(f"F1 Score = {f1}")

# Binary classification metrics
binaryEvaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")
auroc = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderROC"})
aupr = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderPR"})

print(f"Area under ROC = {auroc}")
print(f"Area under PR = {aupr}")

Regression Metrics

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE = {rmse}")
print(f"MSE = {mse}")
print(f"MAE = {mae}")
print(f"R2 = {r2}")

Next Steps

Model Tuning

Optimize hyperparameters with cross-validation

Feature Engineering

Transform and prepare features for better models

Clustering

Explore unsupervised learning algorithms

ML Pipelines

Build end-to-end ML workflows

Build docs developers (and LLMs) love