The following points:
rdd.toDebugString
is for RDDs prior to execution ONLY.
Execution DAG is something that you can observe at run-time for both RDD and Dataframes via Spark Web UI. See new release: https://spark.apache.org/docs/3.0.0-preview/web-ui.html
Prior to execution you can ran an .explain
for Dataframes.
- From a good source:
Spark SQL EXPLAIN operator provide detailed plan information about sql
statement without actually running it. You can use the Spark SQL
EXPLAIN operator to display the actual execution plan that Spark
execution engine will generates and uses while executing any query.
You can use this execution plan to optimize your queries.
A simple example for the Dataframe:
import org.apache.spark.sql.Row
val dfsFilename = "/FileStore/tables/sample_text.txt"
val readFileDF = spark.sparkContext.textFile(dfsFilename)
val wordsDF = readFileDF.flatMap(_.split(" ")).toDF
val wcounts3 = wordsDF.filter(r => (r(0) != "Humpty") || (r(0) != "Dumpty"))
.groupBy("Value") // Note the value
.count().explain()
You tag the statement appropriately, but not on a show(), for a Dataframe/Dataset.
== Physical Plan ==
*(2) HashAggregate(keys=[Value#4], functions=[finalmerge_count(merge count#14L) AS count(1)#8L])
+- Exchange hashpartitioning(Value#4, 200), [id=#61]
+- *(1) HashAggregate(keys=[Value#4], functions=[partial_count(1) AS count#14L])
+- *(1) Filter <function1>.apply
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#4]
+- Scan[obj#3]
Your specific question: Not possible and possibly not entirely valid as there is little optimization to consider.