Skip to main content
At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

What are RDDs?

RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing collection in the driver program, and transforming it. You can:
  • Persist an RDD in memory, allowing it to be reused efficiently across parallel operations
  • Automatically recover from node failures
RDDs automatically recover from node failures, making them resilient to failures in the cluster.

Initializing Spark

The first thing a Spark program must do is create a SparkContext object, which tells Spark how to access a cluster.
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
The appName parameter is a name for your application to show on the cluster UI. The master is a Spark or YARN cluster URL, or a special “local” string to run in local mode.
When running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. For local testing and unit tests, you can pass “local” to run Spark in-process.

Creating RDDs

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system.

Parallelized Collections

Parallelized collections are created by calling parallelize on an existing collection:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

# Now you can operate on it in parallel
total = distData.reduce(lambda a, b: a + b)
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.
You can set the number of partitions manually:
sc.parallelize(data, 10)  # 10 partitions

External Datasets

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, and Amazon S3.
distFile = sc.textFile("data.txt")

# Count total characters in all lines
lineCharCount = distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

File Reading Notes

  • All file-based input methods support running on directories, compressed files, and wildcards (e.g., textFile("/my/directory/*.txt"))
  • The textFile method takes an optional second argument for controlling the number of partitions
  • By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS)

RDD Operations

RDDs support two types of operations:
  • Transformations - Create a new dataset from an existing one (lazy evaluation)
  • Actions - Return a value to the driver program after running a computation
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset. The transformations are only computed when an action requires a result to be returned to the driver program.

Basic Example

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

# To reuse lineLengths, persist it
lineLengths.persist()

Common Transformations

TransformationDescription
map(func)Return a new distributed dataset formed by passing each element through a function.
filter(func)Return a new dataset formed by selecting those elements on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items.
groupByKey()When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
reduceByKey(func)When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where values for each key are aggregated.
sortByKey()When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset sorted by keys.
join(otherDataset)When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs.
distinct()Return a new dataset that contains the distinct elements of the source dataset.

Common Actions

ActionDescription
reduce(func)Aggregate the elements of the dataset using a function (which takes two arguments and returns one).
collect()Return all elements of the dataset as an array at the driver program.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset.
take(n)Return an array with the first n elements of the dataset.
saveAsTextFile(path)Write the elements of the dataset as a text file in a given directory.
foreach(func)Run a function on each element of the dataset.

Working with Key-Value Pairs

Many Spark operations work on RDDs of key-value pairs:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# Sort by key
sorted_counts = counts.sortByKey()

# Collect results
results = sorted_counts.collect()

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations.
# Mark RDD to be persisted
lineLengths.persist()

# Or use cache() for default storage level
lineLengths.cache()

Storage Levels

Storage LevelDescription
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). More space-efficient but more CPU-intensive to read.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2Same as MEMORY_ONLY but replicate each partition on two cluster nodes.
If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

Shared Variables

Spark provides two types of shared variables:

Broadcast Variables

Broadcast variables allow you to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar.value  # [1, 2, 3]

Accumulators

Accumulators are variables that are only “added” to through an associative and commutative operation. They can be used to implement counters or sums.
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value  # 10
Only the driver program can read the accumulator’s value. Tasks running on the cluster can only add to it.

Next Steps

Cluster Overview

Understand Spark’s cluster deployment modes

Job Scheduling

Learn about resource allocation and scheduling

Build docs developers (and LLMs) love