This page covers algorithms for classification and regression, including linear methods, tree-based algorithms, and ensemble methods.
Classification
Classification algorithms predict categorical outcomes. Spark MLlib provides several classification algorithms:
Logistic Regression
Logistic regression is a popular method for predicting categorical responses. In spark.ml, you can use it for binary or multiclass classification.
Binary Classification
For binary classification, logistic regression predicts the probability of outcomes:
from pyspark.ml.classification import LogisticRegression
# Load training data
training = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Create logistic regression model with elastic net regularization
lr = LogisticRegression( maxIter = 10 , regParam = 0.3 , elasticNetParam = 0.8 )
# Fit the model
lrModel = lr.fit(training)
# Print coefficients and intercept
print ( "Coefficients: " + str (lrModel.coefficients))
print ( "Intercept: " + str (lrModel.intercept))
The elasticNetParam corresponds to α and regParam corresponds to λ in the elastic net regularization formula.
Multinomial Classification
For multiclass problems, use multinomial logistic regression:
# Create multinomial logistic regression
mlr = LogisticRegression(
maxIter = 10 ,
regParam = 0.3 ,
elasticNetParam = 0.8 ,
family = "multinomial"
)
# Fit the model
mlrModel = mlr.fit(training)
# Print coefficients matrix and intercepts vector
print ( "Multinomial coefficients: " + str (mlrModel.coefficientMatrix))
print ( "Multinomial intercepts: " + str (mlrModel.interceptVector))
Model Summary
Extract training summary statistics:
# Get training summary
trainingSummary = lrModel.summary
# Print metrics
print ( f "Accuracy: { trainingSummary.accuracy } " )
print ( f "F-measure: { trainingSummary.fMeasureByLabel() } " )
print ( f "Precision: { trainingSummary.precisionByLabel } " )
print ( f "Recall: { trainingSummary.recallByLabel } " )
# For binary classification, access ROC curve
if hasattr (trainingSummary, 'roc' ):
trainingSummary.roc.show()
print ( f "Area under ROC: { trainingSummary.areaUnderROC } " )
Decision Tree Classifier
Decision trees are popular for their interpretability and ability to handle non-linear relationships:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Index labels
labelIndexer = StringIndexer( inputCol = "label" , outputCol = "indexedLabel" ).fit(data)
# Index categorical features
featureIndexer = VectorIndexer(
inputCol = "features" ,
outputCol = "indexedFeatures" ,
maxCategories = 4
).fit(data)
# Split data
(trainingData, testData) = data.randomSplit([ 0.7 , 0.3 ])
# Train Decision Tree
dt = DecisionTreeClassifier( labelCol = "indexedLabel" , featuresCol = "indexedFeatures" )
model = dt.fit(trainingData)
# Make predictions
predictions = model.transform(testData)
predictions.select( "prediction" , "indexedLabel" , "features" ).show( 5 )
Random Forest Classifier
Random forests are ensemble methods that combine multiple decision trees:
from pyspark.ml.classification import RandomForestClassifier
# Train Random Forest
rf = RandomForestClassifier(
labelCol = "indexedLabel" ,
featuresCol = "indexedFeatures" ,
numTrees = 10
)
model = rf.fit(trainingData)
# Make predictions
predictions = model.transform(testData)
# Evaluate accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
labelCol = "indexedLabel" ,
predictionCol = "prediction" ,
metricName = "accuracy"
)
accuracy = evaluator.evaluate(predictions)
print ( f "Test Accuracy = { accuracy } " )
print ( f "Test Error = { 1.0 - accuracy } " )
Gradient-Boosted Tree Classifier
Gradient-boosted trees (GBTs) build an ensemble of trees sequentially:
from pyspark.ml.classification import GBTClassifier
# Train GBT model
gbt = GBTClassifier(
labelCol = "indexedLabel" ,
featuresCol = "indexedFeatures" ,
maxIter = 10
)
model = gbt.fit(trainingData)
# Make predictions
predictions = model.transform(testData)
# Evaluate
accuracy = evaluator.evaluate(predictions)
print ( f "Test Accuracy = { accuracy } " )
Multilayer Perceptron Classifier
Multilayer perceptron classifier (MLPC) is a neural network-based classifier:
from pyspark.ml.classification import MultilayerPerceptronClassifier
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_multiclass_classification_data.txt" )
# Split data
train, test = data.randomSplit([ 0.6 , 0.4 ], seed = 1234 )
# Specify layers: input layer of size 4, two intermediate layers, output layer of size 3
layers = [ 4 , 5 , 4 , 3 ]
# Create trainer
trainer = MultilayerPerceptronClassifier( maxIter = 100 , layers = layers, blockSize = 128 , seed = 1234 )
# Train model
model = trainer.fit(train)
# Compute accuracy
result = model.transform(test)
predictionAndLabels = result.select( "prediction" , "label" )
evaluator = MulticlassClassificationEvaluator( metricName = "accuracy" )
print ( f "Test set accuracy = { evaluator.evaluate(predictionAndLabels) } " )
Linear Support Vector Machine
Linear SVM constructs a hyperplane for binary classification:
from pyspark.ml.classification import LinearSVC
# Load training data
training = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Create LinearSVC model
lsvc = LinearSVC( maxIter = 10 , regParam = 0.1 )
# Fit the model
model = lsvc.fit(training)
# Print coefficients and intercept
print ( f "Coefficients: { model.coefficients } " )
print ( f "Intercept: { model.intercept } " )
Naive Bayes
Naive Bayes classifiers are simple probabilistic classifiers:
from pyspark.ml.classification import NaiveBayes
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Split data
train, test = data.randomSplit([ 0.6 , 0.4 ], seed = 1234 )
# Create Naive Bayes model
nb = NaiveBayes( smoothing = 1.0 , modelType = "multinomial" )
model = nb.fit(train)
# Make predictions
predictions = model.transform(test)
# Evaluate
evaluator = MulticlassClassificationEvaluator(
labelCol = "label" ,
predictionCol = "prediction" ,
metricName = "accuracy"
)
accuracy = evaluator.evaluate(predictions)
print ( f "Test set accuracy = { accuracy } " )
Regression
Regression algorithms predict continuous outcomes.
Linear Regression
Linear regression models the relationship between features and a continuous target:
from pyspark.ml.regression import LinearRegression
# Load training data
training = spark.read.format( "libsvm" ).load( "data/mllib/sample_linear_regression_data.txt" )
# Create linear regression model
lr = LinearRegression( maxIter = 10 , regParam = 0.3 , elasticNetParam = 0.8 )
# Fit the model
lrModel = lr.fit(training)
# Print coefficients and intercept
print ( f "Coefficients: { lrModel.coefficients } " )
print ( f "Intercept: { lrModel.intercept } " )
# Summarize the model
trainingSummary = lrModel.summary
print ( f "RMSE: { trainingSummary.rootMeanSquaredError } " )
print ( f "R2: { trainingSummary.r2 } " )
Generalized Linear Regression
Generalized linear models (GLMs) extend linear regression to different response distributions:
from pyspark.ml.regression import GeneralizedLinearRegression
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_linear_regression_data.txt" )
# Create GLM with Gaussian family and identity link
glr = GeneralizedLinearRegression( family = "gaussian" , link = "identity" , maxIter = 10 , regParam = 0.3 )
# Fit the model
model = glr.fit(data)
# Print coefficients and intercept
print ( f "Coefficients: { model.coefficients } " )
print ( f "Intercept: { model.intercept } " )
# Get summary
summary = model.summary
print ( f "Coefficient Standard Errors: { summary.coefficientStandardErrors } " )
print ( f "T Values: { summary.tValues } " )
print ( f "P Values: { summary.pValues } " )
Available families include Gaussian, Binomial, Poisson, Gamma, and Tweedie. Each family has a canonical link function.
Decision Tree Regression
from pyspark.ml.regression import DecisionTreeRegressor
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Split data
train, test = data.randomSplit([ 0.7 , 0.3 ])
# Train Decision Tree
dt = DecisionTreeRegressor( featuresCol = "features" )
model = dt.fit(train)
# Make predictions
predictions = model.transform(test)
predictions.select( "prediction" , "label" , "features" ).show( 5 )
# Evaluate
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator( labelCol = "label" , predictionCol = "prediction" , metricName = "rmse" )
rmse = evaluator.evaluate(predictions)
print ( f "Root Mean Squared Error (RMSE) = { rmse } " )
Random Forest Regression
from pyspark.ml.regression import RandomForestRegressor
# Train Random Forest
rf = RandomForestRegressor( featuresCol = "features" , numTrees = 10 )
model = rf.fit(train)
# Make predictions
predictions = model.transform(test)
# Evaluate
rmse = evaluator.evaluate(predictions)
print ( f "Root Mean Squared Error (RMSE) = { rmse } " )
Gradient-Boosted Tree Regression
from pyspark.ml.regression import GBTRegressor
# Train GBT model
gbt = GBTRegressor( featuresCol = "features" , maxIter = 10 )
model = gbt.fit(train)
# Make predictions
predictions = model.transform(test)
# Evaluate
rmse = evaluator.evaluate(predictions)
print ( f "Root Mean Squared Error (RMSE) = { rmse } " )
AFT Survival Regression
Accelerated Failure Time (AFT) model for survival analysis:
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors
# Create training data
training = spark.createDataFrame([
( 1.0 , Vectors.dense( 1.0 , 2.0 ), 1.0 ),
( 0.5 , Vectors.dense( 1.5 , 1.0 ), 0.0 ),
( 1.5 , Vectors.dense( 2.0 , 3.0 ), 1.0 ),
], [ "label" , "features" , "censor" ])
# Create AFT model
aft = AFTSurvivalRegression()
model = aft.fit(training)
# Print coefficients and intercept
print ( f "Coefficients: { model.coefficients } " )
print ( f "Intercept: { model.intercept } " )
print ( f "Scale: { model.scale } " )
Isotonic Regression
Isotonic regression fits a monotonically increasing function:
from pyspark.ml.regression import IsotonicRegression
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_isotonic_regression_libsvm_data.txt" )
# Create isotonic regression model
ir = IsotonicRegression()
model = ir.fit(data)
# Print boundaries and predictions
print ( f "Boundaries: { model.boundaries } " )
print ( f "Predictions: { model.predictions } " )
Factorization Machines
Factorization machines handle feature interactions for classification and regression:
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Scale features between 0 and 1
scaler = MinMaxScaler( inputCol = "features" , outputCol = "scaledFeatures" )
scalerModel = scaler.fit(data)
scaledData = scalerModel.transform(data)
# Split data
train, test = scaledData.randomSplit([ 0.8 , 0.2 ], seed = 12345 )
# Train FM classifier
fm = FMClassifier( featuresCol = "scaledFeatures" , stepSize = 0.001 )
model = fm.fit(train)
# Make predictions
predictions = model.transform(test)
# Evaluate
evaluator = MulticlassClassificationEvaluator(
labelCol = "label" ,
predictionCol = "prediction" ,
metricName = "accuracy"
)
accuracy = evaluator.evaluate(predictions)
print ( f "Test set accuracy = { accuracy } " )
When using Factorization Machines, scale continuous features to [0, 1] or one-hot encode them to prevent exploding gradients.
Model Evaluation
Classification Metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
# Multiclass metrics
multiEvaluator = MulticlassClassificationEvaluator( labelCol = "label" , predictionCol = "prediction" )
accuracy = multiEvaluator.evaluate(predictions, {multiEvaluator.metricName: "accuracy" })
f1 = multiEvaluator.evaluate(predictions, {multiEvaluator.metricName: "f1" })
print ( f "Accuracy = { accuracy } " )
print ( f "F1 Score = { f1 } " )
# Binary classification metrics
binaryEvaluator = BinaryClassificationEvaluator( labelCol = "label" , rawPredictionCol = "rawPrediction" )
auroc = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderROC" })
aupr = binaryEvaluator.evaluate(predictions, {binaryEvaluator.metricName: "areaUnderPR" })
print ( f "Area under ROC = { auroc } " )
print ( f "Area under PR = { aupr } " )
Regression Metrics
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator( labelCol = "label" , predictionCol = "prediction" )
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse" })
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse" })
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae" })
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2" })
print ( f "RMSE = { rmse } " )
print ( f "MSE = { mse } " )
print ( f "MAE = { mae } " )
print ( f "R2 = { r2 } " )
Next Steps
Model Tuning Optimize hyperparameters with cross-validation
Feature Engineering Transform and prepare features for better models
Clustering Explore unsupervised learning algorithms
ML Pipelines Build end-to-end ML workflows