5

As per my understanding, there will be one job for each action in Spark.
But often I see there are more than one jobs triggered for a single action. I was trying to test this by doing a simple aggregation on a dataset to get the maximum from each category ( here the "subject" field)

While examining the Spark UI, I can see there are 3 "jobs" executed for the groupBy operation, while I was expecting just one.
Can anyone help me to understand why there is 3 instead of just 1?

   students.show(5)

    +----------+--------------+----------+----+-------+-----+-----+
    |student_id|exam_center_id|   subject|year|quarter|score|grade|
    +----------+--------------+----------+----+-------+-----+-----+
    |         1|             1|      Math|2005|      1|   41|    D|
    |         1|             1|   Spanish|2005|      1|   51|    C|
    |         1|             1|    German|2005|      1|   39|    D|
    |         1|             1|   Physics|2005|      1|   35|    D|
    |         1|             1|   Biology|2005|      1|   53|    C|
    |         1|             1|Philosophy|2005|      1|   73|    B|
    

  // Task : Find Highest Score in each subject
  val highestScores = students.groupBy("subject").max("score")
  highestScores.show(10)

+----------+----------+
|   subject|max(score)|
+----------+----------+
|   Spanish|        98|
|Modern Art|        98|
|    French|        98|
|   Physics|        98|
| Geography|        98|
|   History|        98|
|   English|        98|
|  Classics|        98|
|      Math|        98|
|Philosophy|        98|
+----------+----------+
only showing top 10 rows

While examining the Spark UI, I can see there are 3 "jobs" executed for the groupBy operation, while I was expecting just one. enter image description here

enter image description here Can anyone help me to understand why there is 3 instead of just 1?

== Physical Plan ==
*(2) HashAggregate(keys=[subject#12], functions=[max(score#15)])
+- Exchange hashpartitioning(subject#12, 1)
   +- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
      +- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>
Remis Haroon - رامز
  • 3,304
  • 4
  • 34
  • 62

2 Answers2

2

I think only #3 does the actual "job" (executes a plan which you'll see if you open Details for the query on SQL tab). The other two are preparatory steps --

  • #1 is querying NameNode to build InMemoryFileIndex to read your csv, and
  • #2 is sampling the dataset to execute .groupBy("subject").max("score") which internally requires a sortByKey (here are more details on that).
mazaneicha
  • 8,794
  • 4
  • 33
  • 52
0

I would suggest to check the physical plan-

highestScores.explain()

You might see something like-

*(2) HashAggregate(keys=[subject#9], functions=[max(score#12)], output=[subject#9, max(score)#51])
+- Exchange hashpartitioning(subject#9, 2)
   +- *(1) HashAggregate(keys=[subject#9], functions=[partial_max(score#12)], output=[subject#9, max#61])
  1. [Map stage] stage#1 is to achieve the local aggregation (partial aggregation) and then the shuffling happened using hashpartitioning(subject). Note the hashpartitioner uses group by column
  2. [Reduce stage] stage#2 is to merge the output of stage#1 to get final max(score)
  3. this is actually used to print the top 10 records show(10)
Som
  • 6,193
  • 1
  • 11
  • 22
  • Thanks for your answer, is this behaviour documented somewhere? where can I learn more about this? – Remis Haroon - رامز Jun 28 '20 at 06:42
  • Also ref- https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-HashAggregateExec.html – Som Jun 28 '20 at 07:05
  • 1
    Actually not sure how is that relevant. In general, codegen stages shown in execution plan and jobs submitted do not have to match. – mazaneicha Jun 28 '20 at 09:16
  • Do you mean both of these are different? I think, we will see a job for each of the action in the spark application under execution, no? The above execution plan will show only the stages involved in computing `groupBy...max...` – Som Jun 28 '20 at 13:44