Skip to main content
Spark provides several facilities for scheduling resources between computations. Understanding how Spark schedules work helps you optimize application performance and resource utilization.

Overview

Spark schedules resources at two levels:
  1. Across applications - At the level of the cluster manager
  2. Within applications - When multiple jobs run concurrently in the same SparkContext
Within each Spark application, multiple “jobs” (Spark actions) may be running concurrently if they were submitted by different threads. This is common if your application is serving requests over the network.

Scheduling Across Applications

When running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application.

Static Partitioning

The simplest option, available on all cluster managers, is static partitioning of resources. With this approach, each application is given a maximum amount of resources it can use and holds onto them for its whole duration.
By default, applications run in FIFO (first-in-first-out) order, and each application tries to use all available nodes.
# Limit cores per application
./bin/spark-submit \
  --master spark://host:7077 \
  --conf spark.cores.max=20 \
  --conf spark.executor.memory=4g \
  myapp.jar
Configuration:
  • spark.cores.max - Maximum cores for this application
  • spark.deploy.defaultCores - Default cores for applications that don’t set spark.cores.max
  • spark.executor.memory - Memory per executor
Currently, none of the cluster manager modes provide memory sharing across applications. If you would like to share data this way, we recommend running a single server application that can serve multiple requests by querying the same RDDs.

Dynamic Resource Allocation

Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload.
Your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.

Enabling Dynamic Allocation

To use this feature, your application must set spark.dynamicAllocation.enabled to true. Additionally, you must configure one of the following:
  1. Set up an external shuffle service and enable spark.shuffle.service.enabled
  2. Enable shuffle tracking with spark.dynamicAllocation.shuffleTracking.enabled
  3. Enable decommission with both spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled
# Enable dynamic allocation
spark.dynamicAllocation.enabled=true

# Option 1: Use external shuffle service
spark.shuffle.service.enabled=true

# Option 2: Use shuffle tracking (simpler)
spark.dynamicAllocation.shuffleTracking.enabled=true

Configuration

# Dynamic allocation settings
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true

# Initial executors
spark.dynamicAllocation.initialExecutors=2

# Minimum and maximum executors
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=20

# Timing thresholds
spark.dynamicAllocation.schedulerBacklogTimeout=1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s
spark.dynamicAllocation.executorIdleTimeout=60s

Resource Allocation Policy

Request Policy

Spark requests additional executors when it has pending tasks waiting to be scheduled.
1

Initial Request

When there have been pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, Spark requests 1 additional executor.
2

Exponential Growth

Subsequent requests are triggered every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds, requesting 2, 4, 8, and so on executors.
3

Maximum Limit

Requests stop when reaching spark.dynamicAllocation.maxExecutors.
The exponential increase policy allows applications to request executors cautiously at first, but ramp up quickly if many executors are actually needed.

Remove Policy

Spark removes an executor when it has been idle for more than spark.dynamicAllocation.executorIdleTimeout seconds.
By default, executors containing cached data are never removed. You can configure this behavior with spark.dynamicAllocation.cachedExecutorIdleTimeout.

External Shuffle Service

The external shuffle service is a long-running process that preserves shuffle files written by executors beyond their lifetime. Setup by cluster manager:
  • Standalone: Start workers with spark.shuffle.service.enabled set to true
  • YARN: Follow the YARN-specific configuration instructions
  • Kubernetes: Currently not supported on K8s; use shuffle tracking instead

Scheduling Within an Application

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads.

FIFO Scheduler (Default)

By default, Spark’s scheduler runs jobs in FIFO fashion:
  1. Each job is divided into “stages” (e.g., map and reduce phases)
  2. The first job gets priority on all available resources
  3. If the jobs at the head don’t need the whole cluster, later jobs can start right away
val conf = new SparkConf().setMaster(...).setAppName(...)
val sc = new SparkContext(conf)
// Jobs run in FIFO order by default

Fair Scheduler

Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources.
Fair scheduler is best for multi-user settings where short jobs submitted while a long job is running can start receiving resources right away and still get good response times.

Enable Fair Scheduler

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

Fair Scheduler Pools

The fair scheduler supports grouping jobs into pools and setting different scheduling options for each pool.

Assigning Jobs to Pools

Set the spark.scheduler.pool local property in the thread submitting jobs:
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

// All jobs submitted in this thread will use pool1
val result = rdd.count()

// Clear the pool assignment
sc.setLocalProperty("spark.scheduler.pool", null)
The setting is per-thread, making it easy to have a thread run multiple jobs on behalf of the same user.

Configuring Pool Properties

Create an XML configuration file to define pool properties:
fairscheduler.xml
<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>
Pool Properties:
PropertyDescription
schedulingModeFIFO or FAIR - controls whether jobs within the pool queue up behind each other (default) or share the pool’s resources fairly.
weightControls the pool’s share of the cluster relative to other pools. By default, all pools have a weight of 1. Setting weight=2 gives 2x more resources.
minShareMinimum shares (as CPU cores) that the administrator would like the pool to have. The fair scheduler always attempts to meet all active pools’ minimum shares before redistributing extra resources.

Load Configuration File

// Local file
conf.set("spark.scheduler.allocation.file", "file:///path/to/fairscheduler.xml")

// HDFS file
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/fairscheduler.xml")
Place a file named fairscheduler.xml on the classpath, or set the spark.scheduler.allocation.file property to specify a custom location.

Default Pool Behavior

Without any intervention, newly submitted jobs go into a default pool:
  • Each pool gets an equal share of the cluster
  • Jobs within the default pool run in FIFO order
  • If you create one pool per user, each user gets an equal share of the cluster

Scheduling for JDBC Connections

For JDBC client sessions, you can set the Fair Scheduler pool:
SET spark.sql.thriftserver.scheduler.pool=accounting;

Concurrent Jobs in PySpark

PySpark, by default, does not support synchronizing Python threads with JVM threads. Launching multiple jobs in multiple Python threads does not guarantee to launch each job in each corresponding JVM thread.
Use pyspark.InheritableThread for a Python thread to inherit inheritable attributes such as local properties in a JVM thread:
from pyspark import InheritableThread
import threading

def run_job():
    # This thread inherits the job group and local properties
    result = rdd.count()

sc.setJobGroup("job_group_1", "Job description")
thread = InheritableThread(target=run_job)
thread.start()
thread.join()

Best Practices

  • Use FIFO for batch processing and single-user applications
  • Use Fair scheduler for multi-user environments and interactive workloads
Enable dynamic allocation for shared clusters to improve resource utilization. Use shuffle tracking for simpler setup without external shuffle service.
Configure pool weights based on priority: high-priority pools should have higher weights (e.g., 1000) to ensure they get resources first.
Use the Spark UI to monitor job execution, resource allocation, and identify bottlenecks.

Example: Multi-Tenant Application

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setAppName("Multi-Tenant App")
  .set("spark.scheduler.mode", "FAIR")
  .set("spark.scheduler.allocation.file", "/path/to/fairscheduler.xml")
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")

val sc = new SparkContext(conf)

// Serve user requests with fair scheduling
def handleUserRequest(userId: String, query: String): Unit = {
  sc.setLocalProperty("spark.scheduler.pool", userId)
  // Run user query
  val result = sc.parallelize(1 to 1000).count()
  sc.setLocalProperty("spark.scheduler.pool", null)
}

Next Steps

Cluster Overview

Learn about cluster deployment modes

Performance Tuning

Optimize Spark application performance

Build docs developers (and LLMs) love