SparkR is deprecated from Apache Spark 4.0.0 and will be removed in a future version.
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R, enabling distributed data processing with familiar R syntax.
API Documentation
Access the complete SparkR API documentation at:
Spark R API (Roxygen2)
Overview
SparkR provides a distributed DataFrame implementation that supports operations similar to R data frames and dplyr, but on large datasets. It also includes support for distributed machine learning using MLlib.
Core Components
SparkSession
The entry point for SparkR functionality.
Key Functions:
sparkR.session() - Create or get existing SparkSession
spark.read.df() - Read data into SparkDataFrame
sql() - Execute SQL queries
SparkDataFrame
A distributed collection of data organized into named columns.
Key Operations:
select() - Select columns
filter() / where() - Filter rows
groupBy() - Group data
agg() - Aggregate data
join() - Join DataFrames
arrange() - Sort data
mutate() - Add or modify columns
Quick Start Example
Here’s a basic example using SparkR:
library(SparkR)
# Initialize SparkSession
sparkR.session(master = "local[*]")
# Create a SparkDataFrame
df <- createDataFrame(data.frame(
name = c("Alice", "Bob", "Charlie"),
age = c(25, 30, 35),
department = c("Engineering", "Sales", "Engineering")
))
# Perform operations
result <- df %>%
filter(df$age > 25) %>%
groupBy("department") %>%
agg(avg_age = avg(df$age)) %>%
arrange(desc(df$avg_age))
# Show results
showDF(result)
# Stop SparkSession
sparkR.session.stop()
Starting SparkR
From the Shell
Start the SparkR shell:
./bin/sparkR --master "local[*]"
From RStudio
Connect to Spark from RStudio:
# Set SPARK_HOME if needed
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/path/to/spark")
}
# Load SparkR library
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
# Create SparkSession with configuration
sparkR.session(
master = "local[*]",
sparkConfig = list(
spark.driver.memory = "2g",
spark.executor.memory = "2g"
)
)
Reading Data
Read data from various sources:
# CSV
df <- read.df(
"data/people.csv",
source = "csv",
header = "true",
inferSchema = "true"
)
# JSON
df <- read.json("data/people.json")
# Parquet
df <- read.parquet("data/people.parquet")
# JDBC
df <- read.jdbc(
url = "jdbc:postgresql://localhost/test",
tableName = "people",
user = "username",
password = "password"
)
DataFrame Operations
Common transformations:
# Select columns
select(df, "name", "age")
select(df, df$name, df$age)
# Filter rows
filter(df, df$age > 25)
where(df, df$age > 25) # Alternative
# Add/modify columns
withColumn(df, "age_plus_10", df$age + 10)
withColumnRenamed(df, "name", "full_name")
# Conditional logic
withColumn(
df,
"age_group",
ifelse(df$age < 18, "minor",
ifelse(df$age < 65, "adult", "senior")
)
)
# Aggregations
agg(
groupBy(df, "department"),
avg_age = avg(df$age),
count = n(df$name)
)
# Joins
join(df1, df2, df1$id == df2$id, "inner")
SQL Queries
Execute SQL queries on DataFrames:
# Register as temporary view
createOrReplaceTempView(df, "people")
# Run SQL query
result <- sql("
SELECT department, AVG(age) as avg_age
FROM people
WHERE age > 25
GROUP BY department
ORDER BY avg_age DESC
")
showDF(result)
Machine Learning
Use MLlib algorithms:
# Linear regression
model <- spark.glm(
df,
formula = price ~ bedrooms + bathrooms,
family = "gaussian"
)
# Make predictions
predictions <- predict(model, test_df)
# Logistic regression
logistic_model <- spark.logit(
df,
formula = label ~ feature1 + feature2
)
# K-means clustering
kmeans_model <- spark.kmeans(
df,
formula = ~ feature1 + feature2,
k = 3
)
User-Defined Functions (UDFs)
Create custom functions:
# Define a UDF
categorize_age <- function(age) {
if (age < 18) {
return("minor")
} else if (age < 65) {
return("adult")
} else {
return("senior")
}
}
# Register UDF
spark.udf(categorize_age, returnType = "string")
# Use in SQL
createOrReplaceTempView(df, "people")
result <- sql("
SELECT name, categorize_age(age) as age_category
FROM people
")
Converting Between R and Spark
Move data between R and Spark:
# R data.frame to SparkDataFrame
r_df <- data.frame(
name = c("Alice", "Bob"),
age = c(25, 30)
)
spark_df <- createDataFrame(r_df)
# SparkDataFrame to R data.frame
r_df <- collect(spark_df)
# Sample for local processing
sample_df <- take(spark_df, 100) # Take first 100 rows
Writing Data
Save DataFrames to various formats:
# CSV
write.df(
df,
path = "output/people.csv",
source = "csv",
mode = "overwrite",
header = "true"
)
# Parquet
write.parquet(df, "output/people.parquet")
# JSON
write.json(df, "output/people.json")
# JDBC
write.jdbc(
df,
url = "jdbc:postgresql://localhost/test",
tableName = "people",
mode = "append"
)
Key Functions Reference
Data Operations
select() - Select columns
filter(), where() - Filter rows
groupBy() - Group data
agg(), summarize() - Aggregate
join(), merge() - Join DataFrames
arrange(), orderBy() - Sort
mutate(), withColumn() - Transform columns
distinct() - Remove duplicates
union(), unionAll() - Combine DataFrames
Data I/O
read.df(), read.json(), read.parquet() - Read data
write.df(), write.json(), write.parquet() - Write data
createDataFrame() - Create from R data.frame
collect() - Convert to R data.frame
Aggregation Functions
avg(), mean() - Average
sum() - Sum
min(), max() - Min/Max
count(), n() - Count
sd() - Standard deviation
Configuration
Set Spark configuration:
sparkR.session(
sparkConfig = list(
spark.driver.memory = "4g",
spark.executor.memory = "4g",
spark.executor.cores = "2",
spark.sql.shuffle.partitions = "200"
)
)
Deprecation Notice
As SparkR is deprecated in Spark 4.0.0, consider migrating to:
- PySpark - For similar scripting capabilities with broader support
- sparklyr - An alternative R interface to Spark maintained by RStudio
Additional Resources
SparkR provides DataFrame APIs only. It does not support the full Dataset API or RDD operations.