19

Given a Spark application

  1. What factors decide the number of executors in a stand alone mode? In the Mesos and YARN according to this documents, we can specify the number of executers/cores and memory.

  2. Once a number of executors are started. Does Spark start the tasks in a round robin fashion or is it smart enough to see if some of the executors are idle/busy and then schedule the tasks accordingly.

  3. Also, how does Spark decide on the number of tasks? I did write a simple max temperature program with small dataset and Spark spawned two tasks in a single executor. This is in the Spark stand alone mode.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Praveen Sripati
  • 32,799
  • 16
  • 80
  • 117

4 Answers4

19

Answering your questions:

  1. The standalone mode uses the same configuration variable as Mesos and Yarn modes to set the number of executors. The variable spark.cores.max defines the maximun number of cores used in the spark Context. The default value is infinity so Spark will use all the cores in the cluster. The spark.task.cpus variable defines how many CPUs Spark will allocate for a single task, the default value is 1. With these two variables you can define the maximun number of parallel tasks in your cluster.

  2. When you create an RDD subClass you can define in which machines to run your task. This is defined in the getPreferredLocations method. But as the method signatures suggest this is only a preference so if Spark detects that one machine is not busy, it will launch the task in this idle machine. However I don't know the mechanism used by Spark to know what machines are idle. To achieve locality, we (Stratio) decided to make each Partions smaller so the task takes less time and achieve locality.

  3. The number of tasks of each Spark's operation is defined according to the length of the RDD's partitions. This vector is the result of the getPartitions method that you have to override if you want to develop a new RDD subClass. This method returns how a RDD is split, where the information is and the partitions. When you join two or more RDDs using, for example, union or join operations, the number of tasks of the resulting RDD is the maximum number of tasks of the RDDs involved in the operation. For example: if you join RDD1 that has 100 tasks and RDD2 that has 1000 tasks, the next operation of the resulting RDD will have 1000 tasks. Note that a high number of partitions is not necessarily synonym of more data.

I hope this will help.

Daniel H.
  • 1,782
  • 16
  • 18
jlopezmat
  • 930
  • 4
  • 9
7

I agree with @jlopezmat about how Spark chooses its configuration. With respect to your test code, your are seeing two task due to the way textFile is implemented. From SparkContext.scala:

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }

and if we check what is the value of defaultMinPartitions:

  /** Default min number of partitions for Hadoop RDDs when not given by user */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
Daniel H.
  • 1,782
  • 16
  • 18
  • 1
    Thanks Daniel - When I loaded the small file from HDFS, the number of partitions were default two (RDD.getNumPartitions()) as you mentioned. So, two tasks were started one for each partition and so two files in HDFS. I did a coalesce(1) and the number of files in the output was 1. – Praveen Sripati Sep 25 '14 at 03:32
2

Spark chooses the number of tasks based on the number of partitions in the original data set. If you are using HDFS as your data source, then the number of partitions with be equal to the number of HDFS blocks, by default. You can change the number of partitions in a number of different ways. The top two: as an extra argument to the SparkContext.textFile method; by calling the RDD.repartion method.

David
  • 3,251
  • 18
  • 28
  • I have put a file of 5 blocks and see 5 tasks is Spark. Looks like for one block two partitions are created. For more than one block, the same number of partitions are created. – Praveen Sripati Sep 25 '14 at 04:22
0

Answering some points that were not addressed in previous answers:

  • in Standalone mode, you need to play with --executor-cores and --max-executor-cores to set the number of executors that will be launched (granted that you have enough memory to fit that number if you specify --executor-memory)

  • Spark does not allocate task in a round-robin manner, it uses a mechanism called "Delay Scheduling", which is a pull-based technique allowing each executor to offer it's availability to the master, which will decide whether or not to send a task on it.

Bacon
  • 1,814
  • 3
  • 21
  • 36