Skip to main content
Feature engineering is crucial for building effective machine learning models. Spark MLlib provides comprehensive tools for feature extraction, transformation, and selection.

Overview

MLlib’s feature engineering capabilities are divided into:
  • Extraction: Extracting features from raw data
  • Transformation: Scaling, converting, or modifying features
  • Selection: Selecting a subset from a larger set of features
  • LSH: Locality-sensitive hashing for similarity search

Feature Extractors

TF-IDF

Term Frequency-Inverse Document Frequency (TF-IDF) is widely used in text mining to reflect term importance.

How it Works

Term Frequency (TF): TF(t,d)=number of times term t appears in document dTF(t, d) = \text{number of times term } t \text{ appears in document } d Inverse Document Frequency (IDF): IDF(t,D)=logD+1DF(t,D)+1IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1} TF-IDF: TFIDF(t,d,D)=TF(t,d)IDF(t,D)TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D)

Example

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# Create sample data
sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

# Tokenize sentences into words
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

# Compute term frequency
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# Compute IDF and rescale features
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show(truncate=False)
Use HashingTF for quick feature vectors or CountVectorizer when you want to maintain a vocabulary.

Word2Vec

Word2Vec maps each word to a unique fixed-size vector, capturing semantic relationships:
from pyspark.ml.feature import Word2Vec

# Create document data
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Train Word2Vec model
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# Transform documents to vectors
result = model.transform(documentDF)
result.select("result").show(truncate=False)

# Find synonyms
synonyms = model.findSynonyms("spark", 2)
synonyms.show()

CountVectorizer

CountVectorizer converts text documents to vectors of token counts:
from pyspark.ml.feature import CountVectorizer

# Create sample data
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# Train CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=1.0)
model = cv.fit(df)

# Transform documents
result = model.transform(df)
result.select("features").show(truncate=False)

# Output:
# +-------------------------+
# |features                 |
# +-------------------------+
# |(3,[0,1,2],[1.0,1.0,1.0])|
# |(3,[0,1,2],[2.0,2.0,1.0])|
# +-------------------------+

FeatureHasher

FeatureHasher projects features into a feature vector of specified dimension:
from pyspark.ml.feature import FeatureHasher

# Create dataset with multiple column types
dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

# Create FeatureHasher
hasher = FeatureHasher(
    inputCols=["real", "bool", "stringNum", "string"],
    outputCol="features"
)

# Transform data
featurized = hasher.transform(dataset)
featurized.select("features").show(truncate=False)

Feature Transformers

Tokenizer

Tokenizers split text into individual terms:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

# Create sample data
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

# Simple tokenizer (splits on whitespace)
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataFrame = tokenizer.transform(sentenceDataFrame)

# Regex tokenizer (splits on custom pattern)
regexTokenizer = RegexTokenizer(
    inputCol="sentence", 
    outputCol="words", 
    pattern="\\W"  # Split on non-word characters
)
regexWordsDataFrame = regexTokenizer.transform(sentenceDataFrame)

wordsDataFrame.select("sentence", "words").show(truncate=False)

StopWordsRemover

Remove common stop words from tokenized text:
from pyspark.ml.feature import StopWordsRemover

# Create sample data
sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

# Remove stop words
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

# Output:
# +---+----------------------------+--------------------+
# |id |raw                         |filtered            |
# +---+----------------------------+--------------------+
# |0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
# |1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
# +---+----------------------------+--------------------+

n-gram

Create n-grams from sequences of tokens:
from pyspark.ml.feature import NGram

# Create sample data
wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
], ["id", "words"])

# Create bigrams (2-grams)
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)

ngramDataFrame.select("ngrams").show(truncate=False)

# Output:
# +------------------------------------------------------------------+
# |ngrams                                                            |
# +------------------------------------------------------------------+
# |[Hi I, I heard, heard about, about Spark]                        |
# |[I wish, wish Java, Java could, could use, use case, case classes]|
# +------------------------------------------------------------------+

Binarizer

Threshold numerical features to binary (0/1) features:
from pyspark.ml.feature import Binarizer

# Create sample data
continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

# Binarize based on threshold
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

binarizedDataFrame.show()

# Output:
# +---+-------+------------------+
# | id|feature|binarized_feature|
# +---+-------+------------------+
# |  0|    0.1|               0.0|
# |  1|    0.8|               1.0|
# |  2|    0.2|               0.0|
# +---+-------+------------------+

PCA

Principal Component Analysis projects vectors to a low-dimensional space:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

# Create sample data
data = spark.createDataFrame([
    (Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
    (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
    (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)
], ["features"])

# Train PCA model (reduce to 3 dimensions)
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(data)

# Transform data
result = model.transform(data).select("pcaFeatures")
result.show(truncate=False)

StandardScaler

Normalize features to have unit standard deviation and/or zero mean:
from pyspark.ml.feature import StandardScaler

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Scale features
scaler = StandardScaler(
    inputCol="features", 
    outputCol="scaledFeatures",
    withStd=True,  # Scale to unit std dev
    withMean=False  # Don't center (keeps sparsity)
)

# Compute summary statistics and generate scaler model
scalerModel = scaler.fit(data)

# Transform data
scaledData = scalerModel.transform(data)
scaledData.select("features", "scaledFeatures").show(5)

MinMaxScaler

Rescale features to a specific range (often [0, 1]):
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

# Create sample data
data = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

# Rescale to [0, 1]
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(data)
scaledData = scalerModel.transform(data)

scaledData.select("features", "scaledFeatures").show(truncate=False)

Normalizer

Normalize vectors to have unit norm:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

# Create sample data
data = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize using L2 norm (default)
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=2.0)
l2NormData = normalizer.transform(data)

# Normalize using L1 norm
normalizer.setP(1.0)
l1NormData = normalizer.transform(data)

print("L2 normalized:")
l2NormData.show(truncate=False)
print("L1 normalized:")
l1NormData.show(truncate=False)

StringIndexer

Encode string column to indices:
from pyspark.ml.feature import StringIndexer

# Create sample data
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

# Index categories by frequency
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

indexed.show()

# Output ("a" appears most, gets index 0):
# +---+--------+-------------+
# | id|category|categoryIndex|
# +---+--------+-------------+
# |  0|       a|          0.0|
# |  1|       b|          2.0|
# |  2|       c|          1.0|
# |  3|       a|          0.0|
# |  4|       a|          0.0|
# |  5|       c|          1.0|
# +---+--------+-------------+

OneHotEncoder

Map categorical features to binary vectors:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Create sample data
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

# First, index the category
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)

# Then, one-hot encode
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.fit(indexed).transform(indexed)

encoded.select("id", "category", "categoryVec").show(truncate=False)

# Output:
# +---+--------+-------------+
# |id |category|categoryVec  |
# +---+--------+-------------+
# |0  |a       |(2,[0],[1.0])|
# |1  |b       |(2,[],[])    |
# |2  |c       |(2,[1],[1.0])|
# +---+--------+-------------+

VectorAssembler

Combine multiple columns into a single vector column:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

# Create sample data
data = spark.createDataFrame([
    (0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
    (1, 20, 0.0, Vectors.dense([1.0, 11.0, 0.6]), 0.0),
], ["id", "age", "gender", "userFeatures", "label"])

# Combine multiple columns into features vector
assembler = VectorAssembler(
    inputCols=["age", "gender", "userFeatures"],
    outputCol="features"
)

output = assembler.transform(data)
output.select("features", "label").show(truncate=False)

# Output:
# +---------------------+-----+
# |features             |label|
# +---------------------+-----+
# |[18.0,1.0,0.0,10.0,0.5]|1.0  |
# |[20.0,0.0,1.0,11.0,0.6]|0.0  |
# +---------------------+-----+

Feature Selectors

VectorIndexer

Automatically identify categorical features:
from pyspark.ml.feature import VectorIndexer

# Load data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Identify categorical features (with <= 4 distinct values)
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=4)
indexerModel = indexer.fit(data)

# Transform data
categoricalFeatures = indexerModel.categoryMaps
print(f"Chose {len(categoricalFeatures)} categorical features: {categoricalFeatures}")

indexedData = indexerModel.transform(data)
indexedData.show()

ChiSqSelector

Select features based on Chi-Square test:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

# Create sample data
data = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,),
], ["id", "features", "label"])

# Select top 2 features based on Chi-Square test
selector = ChiSqSelector(
    numTopFeatures=2, 
    featuresCol="features",
    outputCol="selectedFeatures", 
    labelCol="label"
)

result = selector.fit(data).transform(data)
result.select("id", "selectedFeatures").show(truncate=False)

Complete Pipeline Example

Here’s how to combine multiple feature transformers in a pipeline:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StringIndexer

# Prepare training data
training = spark.createDataFrame([
    (0, "spark is awesome", 1.0),
    (1, "hadoop is old", 0.0),
    (2, "spark ml rocks", 1.0),
    (3, "mapreduce is legacy", 0.0)
], ["id", "text", "label"])

# Configure pipeline stages
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(maxIter=10)

# Create pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])

# Fit pipeline
model = pipeline.fit(training)

# Prepare test data
test = spark.createDataFrame([
    (4, "spark is great"),
    (5, "hadoop old system"),
    (6, "spark machine learning")
], ["id", "text"])

# Make predictions
predictions = model.transform(test)
predictions.select("id", "text", "probability", "prediction").show(truncate=False)

Best Practices

Different algorithms require different scaling:
  • Tree-based: Usually don’t need scaling
  • Linear models: Use StandardScaler or MinMaxScaler
  • Neural networks: Use MinMaxScaler to [0, 1]
  • Distance-based: Use StandardScaler or Normalizer
Always:
  1. Use StringIndexer to convert strings to indices
  2. Use OneHotEncoder for linear models
  3. Use VectorIndexer for tree-based models
  4. Set proper handling for unseen categories
Use Pipeline to chain transformations:
  • Ensures consistent preprocessing
  • Prevents training/serving skew
  • Makes code more maintainable
  • Simplifies hyperparameter tuning
When iterating on features:
processedData = pipeline.fit(data).transform(data)
processedData.cache()  # Cache for reuse

Next Steps

ML Pipelines

Combine feature transformers into pipelines

Classification

Apply features to classification models

Model Tuning

Optimize feature parameters with cross-validation

Clustering

Use features for unsupervised learning

Build docs developers (and LLMs) love