4

In Spark, how many tasks are executed in parallel at a time? Discussions are found in How are stages split into tasks in Spark? and How DAG works under the covers in RDD?

But I do not find clear conclusion there.

Consider the following scenarios (assume spark.task.cpus = 1, and ignore vcore concept for simplicity):

  • 10 executors (2 cores/executor), 10 partitions => I think the number of concurrent tasks at a time is 10
  • 10 executors (2 cores/executor), 2 partitions => I think the number of concurrent tasks at a time is 2
  • 10 executors (2 cores/executor), 20 partitions => I think the number of concurrent tasks at a time is 20
  • 10 executors (1 cores/executor), 20 partitions => I think the number of concurrent tasks at a time is 10

Am I correct? Regarding the 3rd case, will it be 20 considering multi-threading (i.e. 2 threads because there are 2 cores) inside one executor?


UPDATE1

If the 3rd case is correct, it means:

  • when idle cores inside an executor are available, Spark could automatically decide to trigger multithreads in that executor
  • when there is only one core in the executor, multithread won't happen in that executor.

If this is true, isn't the behavior of Spark in an executor a bit uncertain (single thread v.s. multithread)?

Note that the code that is shipped from driver to the executors may not have considered automicity problem using e.g. synchronized keyword.

How is this handled by Spark?

jack
  • 1,787
  • 14
  • 30
  • 1
    You are correct (including 3rd case). One other factor to consider is vcores. On Hortonworks/ Cloudera, there is an option for admin to split a single physical core into set virtual cores. Each virtual core would be able to execute a task on a partition. – Sai Aug 10 '20 at 23:43
  • Correct for most common cases. There is however a `spark.task.cpus` setting which can alter that. https://stackoverflow.com/questions/36671832/number-of-cpus-per-task-in-spark – mazaneicha Aug 11 '20 at 02:54
  • @mazaneicha Thanks for the comment. But I think `spark.task.cpus` is about nb. of cpus **per task**, while my question is about nb. of tasks, which is different, but your link is also interesting to know. :) – jack Aug 11 '20 at 21:19
  • If you assign 2 cpus per task, then an executor with 4 cores will only be able to run 2 tasks concurrently, whereas normally it would run 4. – mazaneicha Aug 11 '20 at 21:52
  • @mazaneicha Makes sense, i will put the assumption `spark.task.cpus = 1` in the question. But here you're talking about multithreading inside one task (1 task has 2 threads), and the question is more about multithreading among tasks (1 task has 1 thread, but 2 tasks running concurrently in one executor) – jack Aug 11 '20 at 22:35
  • A nice [talk](https://www.youtube.com/watch?v=vfiJQ7wg81Y&ab_channel=SparkSummit) related – jack Sep 28 '20 at 11:57

2 Answers2

1

I think you are right, this depend on your executor number and the cores, one partition create a task running on one core .

  • Thanks. Does it mean, if the nb. of partitions is larger than the nb. of nodes (assume 1 core per node), it normally doesn't make sense? Consider the 4th case.. – jack Aug 11 '20 at 21:37
1

I think all the 4 cases are correct, and the 4th case makes sense in reality ("overbook" cores). We should normally consider a factor of 2 to 4 for the nb. of partitions, i.e. nb. of partitions equals to 2 to 4 times of nb. of total cpu cores in the cluster.

Regarding threading, 2 tasks in one executor running concurrently should not have issues regarding multi-threading, as each task is handling its own set of RDD.

If spark.task.cpus = 2 is set, which means 2 cpu cores per task, then IMO there might be race condition problem (if there're var), but usually we are handling immutable values like RDD, so there should merely have issues either.

jack
  • 1,787
  • 14
  • 30