Launching Spark on YARN
Apache Hadoop does not support Java 17 as of 3.4.1, while Apache Spark requires at least Java 17 since 4.0.0, so a different JDK should be configured for Spark applications.
HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager.
Deploy Modes
There are two deploy modes that can be used to launch Spark applications on YARN:- Cluster Mode
- Client Mode
In The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console.
cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.To launch a Spark application in cluster mode:In YARN mode, the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the
--master parameter is yarn.Adding Other JARs
Incluster mode, the driver runs on a different machine than the client, so SparkContext.addJar won’t work out of the box with files that are local to the client. To make files on the client available to SparkContext.addJar, include them with the --jars option in the launch command:
Preparations
Running Spark on YARN requires a binary distribution of Spark which is built with YARN support. Binary distributions can be downloaded from the downloads page of the project website.Spark Distribution Variants
There are two variants of Spark binary distributions:with-hadoop Spark Distribution
with-hadoop Spark Distribution
This distribution is pre-built with a certain version of Apache Hadoop and contains built-in Hadoop runtime. By default, when a job is submitted to Hadoop YARN cluster, to prevent jar conflict, it will not populate YARN’s classpath into Spark.To override this behavior, set:
no-hadoop Spark Distribution
no-hadoop Spark Distribution
This distribution is pre-built with user-provided Hadoop. Since it doesn’t contain a built-in Hadoop runtime, it’s smaller, but users have to provide a Hadoop installation separately. Spark will populate YARN’s classpath by default in order to get Hadoop runtime.
spark.yarn.archive or spark.yarn.jars. If neither is specified, Spark will create a zip file with all jars under $SPARK_HOME/jars and upload it to the distributed cache.
Debugging Your Application
In YARN terminology, executors and application masters run inside “containers”. YARN has two modes for handling container logs after an application has completed.Log Aggregation
If log aggregation is turned on (with theyarn.log-aggregation-enable config), container logs are copied to HDFS and deleted on the local machine. These logs can be viewed from anywhere on the cluster with the yarn logs command:
The logs are also available on the Spark Web UI under the Executors Tab. You need to have both the Spark history server and the MapReduce history server running and configure
yarn.log.server.url in yarn-site.xml properly.Without Log Aggregation
When log aggregation isn’t turned on, logs are retained locally on each machine underYARN_APP_LOGS_DIR, which is usually configured to /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version and installation.
Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID.
Spark Properties
Here are some key YARN-specific Spark properties:Application Master Configuration
| Property Name | Default | Meaning |
|---|---|---|
spark.yarn.am.memory | 512m | Amount of memory to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.memory instead |
spark.yarn.am.cores | 1 | Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead |
Executor Configuration
| Property Name | Default | Meaning |
|---|---|---|
spark.executor.instances | 2 | The number of executors for static allocation. With spark.dynamicAllocation.enabled, the initial set of executors will be at least this large |
spark.yarn.executor.memoryOverhead | AM memory * 0.10, with minimum of 384 | Amount of additional memory to be allocated per executor process, in MiB |
File Distribution
| Property Name | Default | Meaning |
|---|---|---|
spark.yarn.archive | (none) | An archive containing needed Spark jars for distribution to the YARN cache. If set, this configuration replaces spark.yarn.jars |
spark.yarn.jars | (none) | List of libraries containing Spark code to distribute to YARN containers |
spark.yarn.dist.files | (none) | Comma-separated list of files to be placed in the working directory of each executor |
spark.yarn.dist.archives | (none) | Comma separated list of archives to be extracted into the working directory of each executor |
Submission Configuration
| Property Name | Default | Meaning |
|---|---|---|
spark.yarn.submit.file.replication | The default HDFS replication (usually 3) | HDFS replication level for the files uploaded into HDFS for the application |
spark.yarn.stagingDir | Current user’s home directory in the filesystem | Staging directory used while submitting applications |
spark.yarn.queue | default | The name of the YARN queue to which the application is submitted |
Configuring the External Shuffle Service
To start the Spark Shuffle Service on eachNodeManager in your YARN cluster, follow these instructions:
Locate the shuffle JAR
Locate the
spark-<version>-yarn-shuffle.jar. This should be under $SPARK_HOME/common/network-yarn/target/scala-<version> if you are building Spark yourself, and under yarn if you are using a distribution.Configure yarn-site.xml
In the
yarn-site.xml on each node:- Add
spark_shuffletoyarn.nodemanager.aux-services - Set
yarn.nodemanager.aux-services.spark_shuffle.classtoorg.apache.spark.network.yarn.YarnShuffleService
Increase NodeManager heap size
Increase
NodeManager's heap size by setting YARN_HEAPSIZE (1000 by default) in etc/hadoop/yarn-env.sh to avoid garbage collection issues during shuffle.Shuffle Service Properties
| Property Name | Default | Meaning |
|---|---|---|
spark.yarn.shuffle.stopOnFailure | false | Whether to stop the NodeManager when there’s a failure in the Spark Shuffle Service’s initialization |
spark.shuffle.service.db.backend | ROCKSDB | When work-preserving restart is enabled in YARN, this is used to specify the disk-base store used in shuffle service state store |
Kerberos
Standard Kerberos support in Spark is covered in the Security page. In YARN mode, when accessing Hadoop file systems, aside from the default file system in the hadoop configuration, Spark will also automatically obtain delegation tokens for the service hosting the staging directory of the Spark application.YARN-specific Kerberos Configuration
| Property Name | Default | Meaning |
|---|---|---|
spark.kerberos.keytab | (none) | The full path to the file that contains the keytab for the principal. This keytab will be copied to the node running the YARN Application Master via the YARN Distributed Cache |
spark.kerberos.principal | (none) | Principal to be used to login to KDC, while running on secure clusters |
spark.yarn.kerberos.relogin.period | 1m | How often to check whether the kerberos TGT should be renewed |
Important Notes
- Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
- In
clustermode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN configyarn.nodemanager.local-dirs). - In
clientmode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined inspark.local.dir. - The
--filesand--archivesoptions support specifying file names with the # similar to Hadoop. For example:--files localtest.txt#appSees.txt
