51

After reading some document on http://spark.apache.org/docs/0.8.0/cluster-overview.html, I got some question that I want to clarify.

Take this example from Spark:

JavaSparkContext spark = new JavaSparkContext(
  new SparkConf().setJars("...").setSparkHome....);
JavaRDD<String> file = spark.textFile("hdfs://...");

// step1
JavaRDD<String> words =
  file.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) {
      return Arrays.asList(s.split(" "));
    }
  });

// step2
JavaPairRDD<String, Integer> pairs =
  words.map(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
  });

// step3
JavaPairRDD<String, Integer> counts =
  pairs.reduceByKey(new Function2<Integer, Integer>() {
    public Integer call(Integer a, Integer b) {
      return a + b;
    }
  });

counts.saveAsTextFile("hdfs://...");

So let's say I have 3 nodes cluster, and node 1 running as master, and the above driver program has been properly jared (say application-test.jar). So now I'm running this code on the master node and I believe right after the SparkContext being created, the application-test.jar file will be copied to the worker nodes (and each worker will create a dir for that application).

So now my question: Are step1, step2 and step3 in the example tasks that get sent over to the workers? If yes, then how does the worker execute that? Like java -cp "application-test.jar" step1 and so on?

gsamaras
  • 71,951
  • 46
  • 188
  • 305
EdwinGuo
  • 1,765
  • 2
  • 21
  • 27

2 Answers2

121

When you create the SparkContext, each worker starts an executor. This is a separate process (JVM), and it loads your jar too. The executors connect back to your driver program. Now the driver can send them commands, like flatMap, map and reduceByKey in your example. When the driver quits, the executors shut down.

RDDs are sort of like big arrays that are split into partitions, and each executor can hold some of these partitions.

A task is a command sent from the driver to an executor by serializing your Function object. The executor deserializes the command (this is possible because it has loaded your jar), and executes it on a partition.

(This is a conceptual overview. I am glossing over some details, but I hope it is helpful.)


To answer your specific question: No, a new process is not started for each step. A new process is started on each worker when the SparkContext is constructed.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Thanks Daniel! really appreciate your response, very helpful!! Couple things that I want to clarify: 1) so when the task being executed on the worker executor, it use the jar as a dependency to execute? 2) when you say stage, can I understand step1 step2.. in my example? 3) is task equal to a function that call the rdd transformation functions, like map and flatMap? – EdwinGuo Aug 13 '14 at 13:23
  • 1) Yes. 2) Yes. I've fixed my answer to say _step_ instead of _stage_. 3) Yes, pretty much. If you run `rdd.map(myFunc)`, a task will be created for each partition. The executors pick up and execute the tasks. Each task in this case runs `myFunc` on one partition. Some operations, like `reduceByKey` are more complex, but this is the basic idea. – Daniel Darabos Aug 13 '14 at 14:02
  • 8
    There is one important twist. Everything happens in a _lazy_ way. So `rdd.map` does not do anything until it needs to. If you do `rdd.filter(...).map(...).collect()`, the `filter` and `map` functions only run on the workers when you call `collect`. But most of the time you do not need to think about this. – Daniel Darabos Aug 13 '14 at 14:05
  • 1
    so the the executors actually stack the rdd transformation task and not execute them unless an action being called on the rdd. that's why it called resilient? it records the transformation task and when some fails, executor just pick up the task and run again? Thanks – EdwinGuo Aug 13 '14 at 14:14
  • Exactly like you say. – Daniel Darabos Aug 13 '14 at 14:16
  • I recently found out that I must define anounymous function within the rdd transformation functions, or I need to define a static method in a global singlton object. So I come back here and take a second though for what you told me, when you said " This is a separate process (JVM), and it loads your jar too. The executors connect back to your driver program", – EdwinGuo Sep 05 '14 at 02:14
  • 1
    so does the jar file actually shuffled to the worker node? or it stay at the driver application's node, and when driver send the task to the worker, and the worker will come back to the driver and reference the jar as dependency to execute the command? so during the whole computation, there is only one copy of the jar file? and it only stay at the driver node? Thanks – EdwinGuo Sep 05 '14 at 02:14
  • 4
    The jar you specify with `SparkContext.addJar` will be copied to all the worker nodes. – Daniel Darabos Sep 05 '14 at 11:08
  • 5
    This is such a ... beautiful ... narration of the concepts behind spark that it drove me almost to tears. Maybe what is misleading is the concept that RDD's are like big arrays. They are a container of instructions in how to materialize those arrays, and how to partition them, not those arrays themselves. – YoYo Apr 09 '15 at 17:03
  • Hey Daniel, does the code that reads from text file (or S3, network socket) also run inside the Executor? – Ranjit Iyer Apr 23 '15 at 18:55
  • @RanjitIyer: Yes. If you use `sc.textFile`, the driver application will first get the list of "splits" (pieces of the file) from the file system. Then the executor threads each load one split at a time in parallel. – Daniel Darabos Apr 24 '15 at 11:16
  • @DanielDarabos When DataFrameWriter.save(String an-s3-path) is invoked, is that run inside executors? If not, then master probably will be spending lot of time writing the file by itself if the path is a network path...before proceeding. Is that right? If yes, will executors not attempt to override each other's data? – user2023507 Jan 13 '16 at 18:17
  • 1
    @user2023507: `DataFrameWriter.save` will cause the executors to write out the data into the given directory. Each partition is written as a separate file, so there is no collision issue. The corresponding `load` method can load from a directory, so the fact that each partition is a separate file is mostly hidden. – Daniel Darabos Jan 13 '16 at 18:50
  • Hi @DanielDarabos is there only 1 task at a time per stage per node? Will multi-threads happen in a given stage in a node (meaning, concurrent tasks)? – jack Aug 10 '20 at 22:23
  • Yes, there will be one task per core. If you look on the Spark UI you can see how it goes down. Each executor knows how many cores it can use and will pick up that many tasks and do them in parallel. – Daniel Darabos Aug 11 '20 at 19:20
37

To get a clear insight on how tasks are created and scheduled, we must understand how execution model works in Spark. Shortly speaking, an application in spark is executed in three steps :

  1. Create RDD graph
  2. Create execution plan according to the RDD graph. Stages are created in this step
  3. Generate tasks based on the plan and get them scheduled across workers

In your word-count example, the RDD graph is rather simple, it's something as follows :

file -> lines -> words -> per-word count -> global word count -> output

Based on this graph, two stages are created. The stage creation rule is based on the idea to pipeline as many narrow transformations as possible. In your example, the narrow transformation finishes at per-word count. Therefore, you get two stages

  1. file -> lines -> words -> per-word count
  2. global word count -> output

Once stages are figured out, spark will generate tasks from stages. The first stage will create ShuffleMapTasks and the last stage will create ResultTasks because in the last stage, one action operation is included to produce results.

The number of tasks to be generated depends on how your files are distributed. Suppose that you have 3 three different files in three different nodes, the first stage will generate 3 tasks : one task per partition.

Therefore, you should not map your steps to tasks directly. A task belongs to a stage, and is related to a partition.

Usually, the number of tasks ran for a stage is exactly the number of partitions of the final RDD, but since RDDs can be shared (and hence ShuffleMapStages) their number varies depending on the RDD/stage sharing. Please refer to How DAG works under the covers in RDD?

Community
  • 1
  • 1
Hui Wang
  • 1,923
  • 3
  • 19
  • 27
  • Thanks Hui . It make sense now why I have close to 20 tasks for single stage. My RDD is has 20 partitions across cluster. One question though how can I force each executor to work just on local data. I see that executor logs saying "storage.BlockManager: Found block rdd_2_2 remotely". Most of block seemed to be found local but some are marked as remote. When I observe steps each on is either NODE_LOCAL or PROCESS_LOCAL – nir Aug 12 '15 at 22:08
  • 1
    @nir, i don't know all details about how executor works but what you are referring maybe the case where data has to be shuffled : moved from one node to another node. If so, you can't force it because it does need those remote data. – Hui Wang Aug 13 '15 at 12:51
  • 1
    I think by RDD graph you mean the DAG graph. – abbas Mar 08 '17 at 04:53