Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix by learning from observed interactions.
Overview
Spark MLlib implements model-based collaborative filtering using the Alternating Least Squares (ALS) algorithm. In this approach:
Users and items are described by a small set of latent factors
These factors can be used to predict missing entries
The algorithm alternates between fixing user factors and item factors
ALS Parameters
The ALS implementation has several important parameters:
Number of latent factors in the model
Maximum number of iterations to run
Regularization parameter to prevent overfitting
Number of blocks for parallelizing computation
Whether to use explicit or implicit feedback
Baseline confidence in implicit feedback (only for implicit mode)
Whether to use nonnegative constraints for least squares
The DataFrame-based API for ALS currently only supports integers for user and item IDs. The IDs must be within the integer value range.
Basic Example
Here’s a complete example using the MovieLens dataset:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# Load data
lines = spark.read.text( "data/mllib/als/sample_movielens_ratings.txt" ).rdd
parts = lines.map( lambda row : row.value.split( "::" ))
ratingsRDD = parts.map( lambda p : Row(
userId = int (p[ 0 ]),
movieId = int (p[ 1 ]),
rating = float (p[ 2 ]),
timestamp = int (p[ 3 ])
))
ratings = spark.createDataFrame(ratingsRDD)
# Split data into training and test sets
(training, test) = ratings.randomSplit([ 0.8 , 0.2 ])
# Build the recommendation model using ALS
als = ALS(
maxIter = 5 ,
regParam = 0.01 ,
userCol = "userId" ,
itemCol = "movieId" ,
ratingCol = "rating" ,
coldStartStrategy = "drop" # Drop rows with NaN predictions
)
model = als.fit(training)
# Evaluate the model on test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(
metricName = "rmse" ,
labelCol = "rating" ,
predictionCol = "prediction"
)
rmse = evaluator.evaluate(predictions)
print ( f "Root-mean-square error = { rmse } " )
Making Recommendations
Once you have a trained model, you can generate recommendations:
Top N Recommendations for All Users
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers( 10 )
userRecs.show( truncate = False )
# Output:
# +------+--------------------------------------------------------------------+
# |userId|recommendations |
# +------+--------------------------------------------------------------------+
# |1 |[(63, 5.2), (99, 5.1), (42, 5.0), ...] |
# |2 |[(87, 4.9), (12, 4.8), (34, 4.7), ...] |
# +------+--------------------------------------------------------------------+
Top N Recommendations for All Items
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems( 10 )
movieRecs.show( truncate = False )
Recommendations for Specific Users/Items
# Generate recommendations for a subset of users
users = ratings.select(als.getUserCol()).distinct().limit( 3 )
userSubsetRecs = model.recommendForUserSubset(users, 10 )
userSubsetRecs.show()
# Generate recommendations for a subset of movies
movies = ratings.select(als.getItemCol()).distinct().limit( 3 )
movieSubsetRecs = model.recommendForItemSubset(movies, 10 )
movieSubsetRecs.show()
Explicit vs. Implicit Feedback
Explicit Feedback
The standard approach treats entries in the user-item matrix as explicit preferences (e.g., ratings):
als = ALS(
maxIter = 5 ,
regParam = 0.01 ,
implicitPrefs = False , # Use explicit feedback
userCol = "userId" ,
itemCol = "movieId" ,
ratingCol = "rating"
)
Implicit Feedback
For implicit feedback (views, clicks, purchases), the data represents the strength of observations rather than explicit ratings:
als = ALS(
maxIter = 5 ,
regParam = 0.01 ,
implicitPrefs = True , # Use implicit feedback
alpha = 1.0 , # Confidence parameter
userCol = "userId" ,
itemCol = "movieId" ,
ratingCol = "rating" # Now represents observation strength
)
With implicit feedback, the algorithm models confidence levels rather than explicit ratings. Higher values indicate stronger confidence in user preferences.
Regularization Scaling
ALS uses a technique called “ALS-WR” (Weighted Regularization) that scales the regularization parameter:
When updating user factors: Scale by number of ratings the user made
When updating item factors: Scale by number of ratings the item received
This makes regParam less dependent on dataset scale, allowing you to:
Learn optimal parameters on a sample
Apply them to the full dataset
Expect similar performance
Cold Start Strategy
The cold start problem occurs when making predictions for users or items not seen during training.
Available Strategies
Assigns NaN predictions for unknown users/items. als = ALS( coldStartStrategy = "nan" )
Use when:
In production systems where you want to detect new users/items
You have a fallback strategy for NaN predictions
Drops rows with NaN predictions. als = ALS( coldStartStrategy = "drop" )
Use when:
During cross-validation to get valid metrics
You only care about predictions for known users/items
During cross-validation, use coldStartStrategy="drop" to avoid NaN values in your evaluation metrics.
Example with Cold Start
# Train model
als = ALS(
maxIter = 5 ,
regParam = 0.01 ,
userCol = "userId" ,
itemCol = "movieId" ,
ratingCol = "rating" ,
coldStartStrategy = "drop" # Essential for cross-validation
)
model = als.fit(training)
# Make predictions (rows with unknown users/items are dropped)
predictions = model.transform(test)
# Evaluate (no NaN values)
evaluator = RegressionEvaluator( metricName = "rmse" , labelCol = "rating" )
rmse = evaluator.evaluate(predictions)
print ( f "RMSE = { rmse } " )
Practical Example: Movie Recommendations
Here’s a complete example showing how to build a movie recommendation system:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Create sample movie ratings data
ratings_data = [
( 1 , 1 , 5.0 ), # User 1 rated Movie 1: 5 stars
( 1 , 2 , 3.0 ),
( 1 , 3 , 4.0 ),
( 2 , 1 , 4.0 ),
( 2 , 2 , 2.0 ),
( 2 , 4 , 5.0 ),
( 3 , 1 , 3.0 ),
( 3 , 3 , 5.0 ),
( 3 , 4 , 4.0 ),
]
ratings = spark.createDataFrame(ratings_data, [ "userId" , "movieId" , "rating" ])
# Split data
train, test = ratings.randomSplit([ 0.8 , 0.2 ], seed = 42 )
# Build ALS model
als = ALS(
maxIter = 10 ,
regParam = 0.1 ,
rank = 10 ,
userCol = "userId" ,
itemCol = "movieId" ,
ratingCol = "rating" ,
coldStartStrategy = "drop"
)
model = als.fit(train)
# Evaluate
predictions = model.transform(test)
evaluator = RegressionEvaluator( metricName = "rmse" , labelCol = "rating" )
rmse = evaluator.evaluate(predictions)
print ( f "RMSE = { rmse } " )
# Get recommendations
userRecs = model.recommendForAllUsers( 5 )
print ( "Top 5 movie recommendations for each user:" )
userRecs.show( truncate = False )
movieRecs = model.recommendForAllItems( 3 )
print ( "Top 3 users who might like each movie:" )
movieRecs.show( truncate = False )
Model Persistence
Save and load trained models:
# Save model
model.save( "path/to/als-model" )
# Load model
from pyspark.ml.recommendation import ALSModel
loadedModel = ALSModel.load( "path/to/als-model" )
# Use loaded model
predictions = loadedModel.transform(testData)
Hyperparameter Tuning
Use cross-validation to find optimal parameters:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Create ALS model
als = ALS( userCol = "userId" , itemCol = "movieId" , ratingCol = "rating" , coldStartStrategy = "drop" )
# Build parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(als.rank, [ 5 , 10 , 15 ]) \
.addGrid(als.maxIter, [ 5 , 10 , 20 ]) \
.addGrid(als.regParam, [ 0.01 , 0.05 , 0.1 ]) \
.build()
# Create evaluator
evaluator = RegressionEvaluator( metricName = "rmse" , labelCol = "rating" )
# Create cross validator
cv = CrossValidator(
estimator = als,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
numFolds = 3
)
# Train
cvModel = cv.fit(training)
# Get best model
bestModel = cvModel.bestModel
print ( f "Best rank: { bestModel.rank } " )
print ( f "Best regParam: { bestModel._java_obj.parent().getRegParam() } " )
Best Practices
Choose appropriate feedback type
Use explicit feedback when you have actual ratings (1-5 stars)
Use implicit feedback for binary data (clicks, views, purchases)
Set alpha higher for implicit feedback to increase confidence in positive examples
Handle cold start properly
Use coldStartStrategy="drop" during model evaluation
Use coldStartStrategy="nan" in production with fallback strategies
Consider content-based recommendations for new users/items
Start with rank=10, regParam=0.1, maxIter=10
Increase rank for complex patterns (but watch for overfitting)
Adjust regParam if seeing over/underfitting
Use cross-validation to find optimal values
Increase numBlocks for large datasets (match number of cores)
Cache DataFrames when reusing them
Partition data appropriately based on your cluster size
Evaluation Metrics
Common metrics for recommendation systems:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator( labelCol = "rating" , predictionCol = "prediction" )
# Root Mean Squared Error
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse" })
print ( f "RMSE = { rmse } " )
# Mean Squared Error
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse" })
print ( f "MSE = { mse } " )
# Mean Absolute Error
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae" })
print ( f "MAE = { mae } " )
# R-squared
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2" })
print ( f "R2 = { r2 } " )
Next Steps
Model Tuning Learn advanced techniques for hyperparameter optimization
Clustering Explore user segmentation with clustering
ML Pipelines Build end-to-end recommendation pipelines
Feature Engineering Enhance recommendations with feature transformations