1

I am searching for a way to obtain the DAG of Scala Spark application using RDD, including the stages and tasks.

I have tried rdd.toDebugString but it only shows the RDD lineage, not the DAG i'm looking for.

I know there is the web UI which displays the DAG, but I want to extract the DAG from the code, in a similar fashion like the explain function does for the dataframe.

Mia Tran
  • 21
  • 2

2 Answers2

2

The following points:

  • rdd.toDebugString is for RDDs prior to execution ONLY.

  • Execution DAG is something that you can observe at run-time for both RDD and Dataframes via Spark Web UI. See new release: https://spark.apache.org/docs/3.0.0-preview/web-ui.html

  • Prior to execution you can ran an .explain for Dataframes.

  • From a good source:

Spark SQL EXPLAIN operator provide detailed plan information about sql statement without actually running it. You can use the Spark SQL EXPLAIN operator to display the actual execution plan that Spark execution engine will generates and uses while executing any query. You can use this execution plan to optimize your queries.

A simple example for the Dataframe:

import org.apache.spark.sql.Row     
val dfsFilename = "/FileStore/tables/sample_text.txt"
val readFileDF = spark.sparkContext.textFile(dfsFilename)  
val wordsDF = readFileDF.flatMap(_.split(" ")).toDF
val wcounts3 = wordsDF.filter(r => (r(0) != "Humpty") || (r(0) != "Dumpty"))
                      .groupBy("Value") // Note the value
                      .count().explain()

You tag the statement appropriately, but not on a show(), for a Dataframe/Dataset.

== Physical Plan ==
*(2) HashAggregate(keys=[Value#4], functions=[finalmerge_count(merge count#14L) AS count(1)#8L])
+- Exchange hashpartitioning(Value#4, 200), [id=#61]
   +- *(1) HashAggregate(keys=[Value#4], functions=[partial_count(1) AS count#14L])
      +- *(1) Filter <function1>.apply
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#4]
            +- Scan[obj#3]

Your specific question: Not possible and possibly not entirely valid as there is little optimization to consider.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • Unfortunately, I'm looking at RDD instead of Dataframe. In addition, I want to obtain the DAG not from the web UI, but from the code. I will clarify that in my question. – Mia Tran Apr 06 '20 at 14:52
  • @MiaTran could you figure out some way to visualize the DAG from code ? – Akash Patel Oct 02 '20 at 17:17
  • @thebluephantom how we can do some query optimization from the physical plan. could you give me some examples? – Akash Patel Oct 02 '20 at 17:19
  • @AkashPatel that is a new question, but https://medium.com/datalex/sparks-logical-and-physical-plans-when-why-how-and-beyond-8cd1947b605a you have a selected plan but adapative can differ. you can rewrite your query to influence the optimzer of course – thebluephantom Oct 04 '20 at 10:01
0

In addition to the explain function, you can also view the webUI when executing your application - if you run locally, it should be on http://localhost:4040/ (as described in the documentation here: https://spark.apache.org/docs/latest/monitoring.html). It provides a list of jobs, a visualization of the DAG for each job, configurations, etc.

Hope this helps!

danielcahall
  • 2,672
  • 8
  • 14
  • I'm using RDD so the ```explain``` function can't be used. I was also looking at the web UI, but I'm looking for a way to get the DAG from the code instead, in a similar fashion like the ```explain``` function – Mia Tran Apr 06 '20 at 14:54
  • Unfortunately, that is not possible, except for toDebugString. That said, RDDs are not so optimizable, so I am not sure the point is valid. – thebluephantom Apr 06 '20 at 15:34
  • Probably you can run a .toDF() at the end and that may give the answer you need. – thebluephantom Apr 06 '20 at 15:39