296

I read Cluster Mode Overview and I still can't understand the different processes in the Spark Standalone cluster and the parallelism.

Is the worker a JVM process or not? I ran the bin\start-slave.sh and found that it spawned the worker, which is actually a JVM.

As per the above link, an executor is a process launched for an application on a worker node that runs tasks. An executor is also a JVM.

These are my questions:

  1. Executors are per application. Then what is the role of a worker? Does it co-ordinate with the executor and communicate the result back to the driver? Or does the driver directly talks to the executor? If so, what is the worker's purpose then?

  2. How to control the number of executors for an application?

  3. Can the tasks be made to run in parallel inside the executor? If so, how to configure the number of threads for an executor?

  4. What is the relation between a worker, executors and executor cores ( --total-executor-cores)?

  5. What does it mean to have more workers per node?

Updated

Let's take examples to understand better.

Example 1: A standalone cluster with 5 worker nodes (each node having 8 cores) When I start an application with default settings.

Example 2 Same cluster config as example 1, but I run an application with the following settings --executor-cores 10 --total-executor-cores 10.

Example 3 Same cluster config as example 1, but I run an application with the following settings --executor-cores 10 --total-executor-cores 50.

Example 4 Same cluster config as example 1, but I run an application with the following settings --executor-cores 50 --total-executor-cores 50.

Example 5 Same cluster config as example 1, but I run an application with the following settings --executor-cores 50 --total-executor-cores 10.

In each of these examples, How many executors? How many threads per executor? How many cores? How is the number of executors decided per application? Is it always the same as the number of workers?

Ankur Chavda
  • 33
  • 11
Manikandan Kannan
  • 8,684
  • 15
  • 44
  • 65

2 Answers2

361

enter image description here

Spark uses a master/slave architecture. As you can see in the figure, it has one central coordinator (Driver) that communicates with many distributed workers (executors). The driver and each of the executors run in their own Java processes.

DRIVER

The driver is the process where the main method runs. First it converts the user program into tasks and after that it schedules the tasks on the executors.

EXECUTORS

Executors are worker nodes' processes in charge of running individual tasks in a given Spark job. They are launched at the beginning of a Spark application and typically run for the entire lifetime of an application. Once they have run the task they send the results to the driver. They also provide in-memory storage for RDDs that are cached by user programs through Block Manager.

APPLICATION EXECUTION FLOW

With this in mind, when you submit an application to the cluster with spark-submit this is what happens internally:

  1. A standalone application starts and instantiates a SparkContext instance (and it is only then when you can call the application a driver).
  2. The driver program ask for resources to the cluster manager to launch executors.
  3. The cluster manager launches executors.
  4. The driver process runs through the user application. Depending on the actions and transformations over RDDs task are sent to executors.
  5. Executors run the tasks and save the results.
  6. If any worker crashes, its tasks will be sent to different executors to be processed again. In the book "Learning Spark: Lightning-Fast Big Data Analysis" they talk about Spark and Fault Tolerance:

Spark automatically deals with failed or slow machines by re-executing failed or slow tasks. For example, if the node running a partition of a map() operation crashes, Spark will rerun it on another node; and even if the node does not crash but is simply much slower than other nodes, Spark can preemptively launch a “speculative” copy of the task on another node, and take its result if that finishes.

  1. With SparkContext.stop() from the driver or if the main method exits/crashes all the executors will be terminated and the cluster resources will be released by the cluster manager.

YOUR QUESTIONS

  1. When executors are started they register themselves with the driver and from so on they communicate directly. The workers are in charge of communicating the cluster manager the availability of their resources.

  2. In a YARN cluster you can do that with --num-executors. In a standalone cluster you will get one executor per worker unless you play with spark.executor.cores and a worker has enough cores to hold more than one executor. (As @JacekLaskowski pointed out, --num-executors is no longer in use in YARN https://github.com/apache/spark/commit/16b6d18613e150c7038c613992d80a7828413e66)

  3. You can assign the number of cores per executor with --executor-cores

  4. --total-executor-cores is the max number of executor cores per application

  5. As Sean Owen said in this thread: "there's not a good reason to run more than one worker per machine". You would have many JVM sitting in one machine for instance.

UPDATE

I haven't been able to test this scenarios, but according to documentation:

EXAMPLE 1: Spark will greedily acquire as many cores and executors as are offered by the scheduler. So in the end you will get 5 executors with 8 cores each.

EXAMPLE 2 to 5: Spark won't be able to allocate as many cores as requested in a single worker, hence no executors will be launch.

Community
  • 1
  • 1
Marco
  • 4,837
  • 2
  • 22
  • 27
  • Thanks @Marco. So typically, one should not worry about the heap memory on the worker as it just manages the node resources? – Manikandan Kannan Sep 17 '15 at 14:29
  • You should work with spark.executor.memory and make sure it fits your spark workers memory – Marco Sep 17 '15 at 14:44
  • 8
    What a great answer! Thanks @Marco. As per https://github.com/apache/spark/commit/16b6d18613e150c7038c613992d80a7828413e66 `--num-executors` are no longer in use in YARN. – Jacek Laskowski Sep 19 '15 at 09:00
  • 1
    @Marco thanks for great answer. Can you expand on the ongoing role of cluster manager while the driver runs? ... It must handle the case where driver or workers or both crash or stop responding, to know what resources are available. – Iain Dec 03 '15 at 22:28
  • 1
    @lain the driver contacts the cluster manager for the allocation of resources and also requests cluster mgr to launch the executors – Aravind Yarram Jan 12 '16 at 01:02
  • @JacekLaskowski I created new subject: http://stackoverflow.com/questions/37682068/spark-executors-loading-querying-data-very-low-performance –  Jun 07 '16 at 14:26
  • @JacekLaskowski Are **Executors** multi-threaded processes? Am taking the cue from [here](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html). – CᴴᴀZ Jul 12 '17 at 10:51
  • Great answer. Confused about terminology between node/worker/slave/worker_node are they all the same just a single physical machine or not? – astro_asz Feb 01 '18 at 09:27
  • 1
    @astro_asz a node is a machine that usually has just one worker (there's no good reason to have more than one worker for instance... but is possible). A worker has many executors. – Marco Feb 05 '18 at 09:54
  • @Marco What about slave? Is it a worker or node? What about worker_node? I encountered all of these terminologies in different contexts. – astro_asz Feb 05 '18 at 09:57
  • @astro_asz A worker node is any node that can run application code in the cluster. A slave is a worker (JVM) – Marco Feb 05 '18 at 21:50
  • Are there multiple drivers if more than one application is submitted? – Brad Ellis Nov 29 '18 at 01:31
  • 3
    Great answer. You can find detailed information about spark internals here https://github.com/JerryLead/SparkInternals/blob/master/EnglishVersion/1-Overview.md – Amar Gajbhiye Dec 07 '18 at 11:27
30

This is how Apache Spark working internally:

enter image description here

Sharhabeel Hamdan
  • 1,273
  • 13
  • 15