Feature engineering is crucial for building effective machine learning models. Spark MLlib provides comprehensive tools for feature extraction, transformation, and selection.
Overview
MLlib’s feature engineering capabilities are divided into:
Extraction : Extracting features from raw data
Transformation : Scaling, converting, or modifying features
Selection : Selecting a subset from a larger set of features
LSH : Locality-sensitive hashing for similarity search
TF-IDF
Term Frequency-Inverse Document Frequency (TF-IDF) is widely used in text mining to reflect term importance.
How it Works
Term Frequency (TF):
T F ( t , d ) = number of times term t appears in document d TF(t, d) = \text{number of times term } t \text{ appears in document } d TF ( t , d ) = number of times term t appears in document d
Inverse Document Frequency (IDF):
I D F ( t , D ) = log ∣ D ∣ + 1 D F ( t , D ) + 1 IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1} I D F ( t , D ) = log D F ( t , D ) + 1 ∣ D ∣ + 1
TF-IDF:
T F I D F ( t , d , D ) = T F ( t , d ) ⋅ I D F ( t , D ) TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D) TF I D F ( t , d , D ) = TF ( t , d ) ⋅ I D F ( t , D )
Example
from pyspark.ml.feature import HashingTF, IDF , Tokenizer
# Create sample data
sentenceData = spark.createDataFrame([
( 0.0 , "Hi I heard about Spark" ),
( 0.0 , "I wish Java could use case classes" ),
( 1.0 , "Logistic regression models are neat" )
], [ "label" , "sentence" ])
# Tokenize sentences into words
tokenizer = Tokenizer( inputCol = "sentence" , outputCol = "words" )
wordsData = tokenizer.transform(sentenceData)
# Compute term frequency
hashingTF = HashingTF( inputCol = "words" , outputCol = "rawFeatures" , numFeatures = 20 )
featurizedData = hashingTF.transform(wordsData)
# Compute IDF and rescale features
idf = IDF( inputCol = "rawFeatures" , outputCol = "features" )
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select( "label" , "features" ).show( truncate = False )
Use HashingTF for quick feature vectors or CountVectorizer when you want to maintain a vocabulary.
Word2Vec
Word2Vec maps each word to a unique fixed-size vector, capturing semantic relationships:
from pyspark.ml.feature import Word2Vec
# Create document data
documentDF = spark.createDataFrame([
( "Hi I heard about Spark" .split( " " ), ),
( "I wish Java could use case classes" .split( " " ), ),
( "Logistic regression models are neat" .split( " " ), )
], [ "text" ])
# Train Word2Vec model
word2Vec = Word2Vec( vectorSize = 3 , minCount = 0 , inputCol = "text" , outputCol = "result" )
model = word2Vec.fit(documentDF)
# Transform documents to vectors
result = model.transform(documentDF)
result.select( "result" ).show( truncate = False )
# Find synonyms
synonyms = model.findSynonyms( "spark" , 2 )
synonyms.show()
CountVectorizer
CountVectorizer converts text documents to vectors of token counts:
from pyspark.ml.feature import CountVectorizer
# Create sample data
df = spark.createDataFrame([
( 0 , "a b c" .split( " " )),
( 1 , "a b b c a" .split( " " ))
], [ "id" , "words" ])
# Train CountVectorizer
cv = CountVectorizer( inputCol = "words" , outputCol = "features" , vocabSize = 3 , minDF = 1.0 )
model = cv.fit(df)
# Transform documents
result = model.transform(df)
result.select( "features" ).show( truncate = False )
# Output:
# +-------------------------+
# |features |
# +-------------------------+
# |(3,[0,1,2],[1.0,1.0,1.0])|
# |(3,[0,1,2],[2.0,2.0,1.0])|
# +-------------------------+
FeatureHasher
FeatureHasher projects features into a feature vector of specified dimension:
from pyspark.ml.feature import FeatureHasher
# Create dataset with multiple column types
dataset = spark.createDataFrame([
( 2.2 , True , "1" , "foo" ),
( 3.3 , False , "2" , "bar" ),
( 4.4 , False , "3" , "baz" ),
( 5.5 , False , "4" , "foo" )
], [ "real" , "bool" , "stringNum" , "string" ])
# Create FeatureHasher
hasher = FeatureHasher(
inputCols = [ "real" , "bool" , "stringNum" , "string" ],
outputCol = "features"
)
# Transform data
featurized = hasher.transform(dataset)
featurized.select( "features" ).show( truncate = False )
Tokenizer
Tokenizers split text into individual terms:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
# Create sample data
sentenceDataFrame = spark.createDataFrame([
( 0 , "Hi I heard about Spark" ),
( 1 , "I wish Java could use case classes" ),
( 2 , "Logistic,regression,models,are,neat" )
], [ "id" , "sentence" ])
# Simple tokenizer (splits on whitespace)
tokenizer = Tokenizer( inputCol = "sentence" , outputCol = "words" )
wordsDataFrame = tokenizer.transform(sentenceDataFrame)
# Regex tokenizer (splits on custom pattern)
regexTokenizer = RegexTokenizer(
inputCol = "sentence" ,
outputCol = "words" ,
pattern = " \\ W" # Split on non-word characters
)
regexWordsDataFrame = regexTokenizer.transform(sentenceDataFrame)
wordsDataFrame.select( "sentence" , "words" ).show( truncate = False )
StopWordsRemover
Remove common stop words from tokenized text:
from pyspark.ml.feature import StopWordsRemover
# Create sample data
sentenceData = spark.createDataFrame([
( 0 , [ "I" , "saw" , "the" , "red" , "balloon" ]),
( 1 , [ "Mary" , "had" , "a" , "little" , "lamb" ])
], [ "id" , "raw" ])
# Remove stop words
remover = StopWordsRemover( inputCol = "raw" , outputCol = "filtered" )
remover.transform(sentenceData).show( truncate = False )
# Output:
# +---+----------------------------+--------------------+
# |id |raw |filtered |
# +---+----------------------------+--------------------+
# |0 |[I, saw, the, red, balloon] |[saw, red, balloon] |
# |1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
# +---+----------------------------+--------------------+
n-gram
Create n-grams from sequences of tokens:
from pyspark.ml.feature import NGram
# Create sample data
wordDataFrame = spark.createDataFrame([
( 0 , [ "Hi" , "I" , "heard" , "about" , "Spark" ]),
( 1 , [ "I" , "wish" , "Java" , "could" , "use" , "case" , "classes" ]),
], [ "id" , "words" ])
# Create bigrams (2-grams)
ngram = NGram( n = 2 , inputCol = "words" , outputCol = "ngrams" )
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select( "ngrams" ).show( truncate = False )
# Output:
# +------------------------------------------------------------------+
# |ngrams |
# +------------------------------------------------------------------+
# |[Hi I, I heard, heard about, about Spark] |
# |[I wish, wish Java, Java could, could use, use case, case classes]|
# +------------------------------------------------------------------+
Binarizer
Threshold numerical features to binary (0/1) features:
from pyspark.ml.feature import Binarizer
# Create sample data
continuousDataFrame = spark.createDataFrame([
( 0 , 0.1 ),
( 1 , 0.8 ),
( 2 , 0.2 )
], [ "id" , "feature" ])
# Binarize based on threshold
binarizer = Binarizer( threshold = 0.5 , inputCol = "feature" , outputCol = "binarized_feature" )
binarizedDataFrame = binarizer.transform(continuousDataFrame)
binarizedDataFrame.show()
# Output:
# +---+-------+------------------+
# | id|feature|binarized_feature|
# +---+-------+------------------+
# | 0| 0.1| 0.0|
# | 1| 0.8| 1.0|
# | 2| 0.2| 0.0|
# +---+-------+------------------+
PCA
Principal Component Analysis projects vectors to a low-dimensional space:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
# Create sample data
data = spark.createDataFrame([
(Vectors.sparse( 5 , [( 1 , 1.0 ), ( 3 , 7.0 )]),),
(Vectors.dense([ 2.0 , 0.0 , 3.0 , 4.0 , 5.0 ]),),
(Vectors.dense([ 4.0 , 0.0 , 0.0 , 6.0 , 7.0 ]),)
], [ "features" ])
# Train PCA model (reduce to 3 dimensions)
pca = PCA( k = 3 , inputCol = "features" , outputCol = "pcaFeatures" )
model = pca.fit(data)
# Transform data
result = model.transform(data).select( "pcaFeatures" )
result.show( truncate = False )
StandardScaler
Normalize features to have unit standard deviation and/or zero mean:
from pyspark.ml.feature import StandardScaler
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Scale features
scaler = StandardScaler(
inputCol = "features" ,
outputCol = "scaledFeatures" ,
withStd = True , # Scale to unit std dev
withMean = False # Don't center (keeps sparsity)
)
# Compute summary statistics and generate scaler model
scalerModel = scaler.fit(data)
# Transform data
scaledData = scalerModel.transform(data)
scaledData.select( "features" , "scaledFeatures" ).show( 5 )
MinMaxScaler
Rescale features to a specific range (often [0, 1]):
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
# Create sample data
data = spark.createDataFrame([
( 0 , Vectors.dense([ 1.0 , 0.1 , - 1.0 ]),),
( 1 , Vectors.dense([ 2.0 , 1.1 , 1.0 ]),),
( 2 , Vectors.dense([ 3.0 , 10.1 , 3.0 ]),)
], [ "id" , "features" ])
# Rescale to [0, 1]
scaler = MinMaxScaler( inputCol = "features" , outputCol = "scaledFeatures" )
scalerModel = scaler.fit(data)
scaledData = scalerModel.transform(data)
scaledData.select( "features" , "scaledFeatures" ).show( truncate = False )
Normalizer
Normalize vectors to have unit norm:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
# Create sample data
data = spark.createDataFrame([
( 0 , Vectors.dense([ 1.0 , 0.5 , - 1.0 ]),),
( 1 , Vectors.dense([ 2.0 , 1.0 , 1.0 ]),),
( 2 , Vectors.dense([ 4.0 , 10.0 , 2.0 ]),)
], [ "id" , "features" ])
# Normalize using L2 norm (default)
normalizer = Normalizer( inputCol = "features" , outputCol = "normFeatures" , p = 2.0 )
l2NormData = normalizer.transform(data)
# Normalize using L1 norm
normalizer.setP( 1.0 )
l1NormData = normalizer.transform(data)
print ( "L2 normalized:" )
l2NormData.show( truncate = False )
print ( "L1 normalized:" )
l1NormData.show( truncate = False )
StringIndexer
Encode string column to indices:
from pyspark.ml.feature import StringIndexer
# Create sample data
df = spark.createDataFrame([
( 0 , "a" ),
( 1 , "b" ),
( 2 , "c" ),
( 3 , "a" ),
( 4 , "a" ),
( 5 , "c" )
], [ "id" , "category" ])
# Index categories by frequency
indexer = StringIndexer( inputCol = "category" , outputCol = "categoryIndex" )
indexed = indexer.fit(df).transform(df)
indexed.show()
# Output ("a" appears most, gets index 0):
# +---+--------+-------------+
# | id|category|categoryIndex|
# +---+--------+-------------+
# | 0| a| 0.0|
# | 1| b| 2.0|
# | 2| c| 1.0|
# | 3| a| 0.0|
# | 4| a| 0.0|
# | 5| c| 1.0|
# +---+--------+-------------+
OneHotEncoder
Map categorical features to binary vectors:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# Create sample data
df = spark.createDataFrame([
( 0 , "a" ),
( 1 , "b" ),
( 2 , "c" ),
( 3 , "a" ),
( 4 , "a" ),
( 5 , "c" )
], [ "id" , "category" ])
# First, index the category
indexer = StringIndexer( inputCol = "category" , outputCol = "categoryIndex" )
indexed = indexer.fit(df).transform(df)
# Then, one-hot encode
encoder = OneHotEncoder( inputCol = "categoryIndex" , outputCol = "categoryVec" )
encoded = encoder.fit(indexed).transform(indexed)
encoded.select( "id" , "category" , "categoryVec" ).show( truncate = False )
# Output:
# +---+--------+-------------+
# |id |category|categoryVec |
# +---+--------+-------------+
# |0 |a |(2,[0],[1.0])|
# |1 |b |(2,[],[]) |
# |2 |c |(2,[1],[1.0])|
# +---+--------+-------------+
VectorAssembler
Combine multiple columns into a single vector column:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
# Create sample data
data = spark.createDataFrame([
( 0 , 18 , 1.0 , Vectors.dense([ 0.0 , 10.0 , 0.5 ]), 1.0 ),
( 1 , 20 , 0.0 , Vectors.dense([ 1.0 , 11.0 , 0.6 ]), 0.0 ),
], [ "id" , "age" , "gender" , "userFeatures" , "label" ])
# Combine multiple columns into features vector
assembler = VectorAssembler(
inputCols = [ "age" , "gender" , "userFeatures" ],
outputCol = "features"
)
output = assembler.transform(data)
output.select( "features" , "label" ).show( truncate = False )
# Output:
# +---------------------+-----+
# |features |label|
# +---------------------+-----+
# |[18.0,1.0,0.0,10.0,0.5]|1.0 |
# |[20.0,0.0,1.0,11.0,0.6]|0.0 |
# +---------------------+-----+
Feature Selectors
VectorIndexer
Automatically identify categorical features:
from pyspark.ml.feature import VectorIndexer
# Load data
data = spark.read.format( "libsvm" ).load( "data/mllib/sample_libsvm_data.txt" )
# Identify categorical features (with <= 4 distinct values)
indexer = VectorIndexer( inputCol = "features" , outputCol = "indexed" , maxCategories = 4 )
indexerModel = indexer.fit(data)
# Transform data
categoricalFeatures = indexerModel.categoryMaps
print ( f "Chose { len (categoricalFeatures) } categorical features: { categoricalFeatures } " )
indexedData = indexerModel.transform(data)
indexedData.show()
ChiSqSelector
Select features based on Chi-Square test:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
# Create sample data
data = spark.createDataFrame([
( 7 , Vectors.dense([ 0.0 , 0.0 , 18.0 , 1.0 ]), 1.0 ,),
( 8 , Vectors.dense([ 0.0 , 1.0 , 12.0 , 0.0 ]), 0.0 ,),
( 9 , Vectors.dense([ 1.0 , 0.0 , 15.0 , 0.1 ]), 0.0 ,),
], [ "id" , "features" , "label" ])
# Select top 2 features based on Chi-Square test
selector = ChiSqSelector(
numTopFeatures = 2 ,
featuresCol = "features" ,
outputCol = "selectedFeatures" ,
labelCol = "label"
)
result = selector.fit(data).transform(data)
result.select( "id" , "selectedFeatures" ).show( truncate = False )
Complete Pipeline Example
Here’s how to combine multiple feature transformers in a pipeline:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, IDF , StringIndexer
# Prepare training data
training = spark.createDataFrame([
( 0 , "spark is awesome" , 1.0 ),
( 1 , "hadoop is old" , 0.0 ),
( 2 , "spark ml rocks" , 1.0 ),
( 3 , "mapreduce is legacy" , 0.0 )
], [ "id" , "text" , "label" ])
# Configure pipeline stages
tokenizer = Tokenizer( inputCol = "text" , outputCol = "words" )
hashingTF = HashingTF( inputCol = "words" , outputCol = "rawFeatures" , numFeatures = 1000 )
idf = IDF( inputCol = "rawFeatures" , outputCol = "features" )
lr = LogisticRegression( maxIter = 10 )
# Create pipeline
pipeline = Pipeline( stages = [tokenizer, hashingTF, idf, lr])
# Fit pipeline
model = pipeline.fit(training)
# Prepare test data
test = spark.createDataFrame([
( 4 , "spark is great" ),
( 5 , "hadoop old system" ),
( 6 , "spark machine learning" )
], [ "id" , "text" ])
# Make predictions
predictions = model.transform(test)
predictions.select( "id" , "text" , "probability" , "prediction" ).show( truncate = False )
Best Practices
Scale features appropriately
Different algorithms require different scaling:
Tree-based : Usually don’t need scaling
Linear models : Use StandardScaler or MinMaxScaler
Neural networks : Use MinMaxScaler to [0, 1]
Distance-based : Use StandardScaler or Normalizer
Handle categorical features correctly
Always:
Use StringIndexer to convert strings to indices
Use OneHotEncoder for linear models
Use VectorIndexer for tree-based models
Set proper handling for unseen categories
Use Pipeline to chain transformations:
Ensures consistent preprocessing
Prevents training/serving skew
Makes code more maintainable
Simplifies hyperparameter tuning
Cache intermediate results
Next Steps
ML Pipelines Combine feature transformers into pipelines
Classification Apply features to classification models
Model Tuning Optimize feature parameters with cross-validation
Clustering Use features for unsupervised learning