I have a problem with re-balancing Apache Spark jobs resources on YARN Fair Scheduled queues.
For the tests I've configured Hadoop 2.6 (tried 2.7 also) to run in pseudo-distributed mode with local HDFS on MacOS. For job submission used "Pre-build Spark 1.4 for Hadoop 2.6 and later" (tried 1.5 also) distribution from Spark's website.
When tested with basic configuration on Hadoop MapReduce jobs, Fair Scheduler works as expected: When resources of the cluster exceed some maximum, fair shares are calculated and resources for jobs in different queues are preempted and balanced based on these calculations.
The same test is ran with Spark jobs, in that case YARN is making correct calculations of the fair shares for each job, but resources for Spark containers are not re-balanced.
Here are my conf files:
$HADOOP_HOME/etc/hadoop/yarn-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<property>
<name>yarn.scheduler.fair.preemption</name>
<value>true</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/fair-scheduler.xml
<?xml version="1.0" encoding="UTF-8"?>
<allocations>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queue name="prod">
<weight>40</weight>
<schedulingPolicy>fifo</schedulingPolicy>
</queue>
<queue name="dev">
<weight>60</weight>
<queue name="eng" />
<queue name="science" />
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false" />
<rule name="primaryGroup" create="false" />
<rule name="default" queue="dev.eng" />
</queuePlacementPolicy>
</allocations>
$HADOOP_HOME/etc/hadoop/core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
And the test case is:
Run a job on the "prod" queue with weight 40 (must allocate 40% of all resources), as expected the job takes all needed free resources (62,5% of the clusters resources).
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512M \
--executor-memory 768M \
--executor-cores 1 \
--num-executors 2 \
--queue prod \
lib/spark-examples*.jar 100000
After that run the same job on the "dev.eng" queue with weight 60, that mean the job must allocate 60% of all resources and decrease the first job's resources to ~40%.
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512M \
--executor-memory 768M \
--executor-cores 1 \
--num-executors 2 \
--queue dev.eng \
lib/spark-examples*.jar 100000
Unfortunately, cluster resources are not changing - 62,5% for the first job and 37,5% for second.