0

Suppose we start from some data and gets some intermediate result df_intermediate. Along the pipeline from source data to df_intermediate, all transformations are lazy and nothing is actually computed.

Then I would like to perform two different transformations to df_intermediate. For example, I would like to calculate df_intermediate.agg({"col":"max"}) and df_intermediate.approxquantile("col", [0.1,0.2,0.3], 0.01) using two separate commands.

I wonder in the following scenario, does spark need to recompute df_intermediate when it is performing the second transformation? In other words, does Spark perform the calculation for the above two transformations both start from the raw data without storing the intermediate result? Obviously I can cache the intermediate result but I'm just wondering if Spark does this kind of optimization internallly.

DiveIntoML
  • 2,347
  • 2
  • 20
  • 36

1 Answers1

1

It is somewhat disappointing. But firstly you need to see it in terms of Actions. I will not consider the caching.

If you do the following there will be optimization for sure.

val df1 = df0.withColumn(...
val df2 = df1.withColumn(...

Your example needs an Action like count to work. But the two statements are too diverse, so that there is no skipped processing evident. There is thus no sharing.

In general the Action = Job is correct way to look at it. For DFs Catalyst Optimizer can kick a Job off even though you may not realize this. For RRDs (legacy) this was a little different.

This does not get optimized either:

import org.apache.spark.sql.functions._
val df = spark.range(1,10000).toDF("c1")
val df_intermediate = df.withColumn("c2", col("c1") + 100) 
val x = df_intermediate.agg(max("c2"))  
val y = df_intermediate.agg(min("c2")) 
val z = x.union(y).count  

x and y both go back to source. One would have thought that would be easier to do and it is also 1 Action here. Need to do the .explain, but the idea is to leave it to Spark due to lazy evaluation, etc.

As an aside: Is it efficient to cache a dataframe for a single Action Spark application in which that dataframe is referenced more than once? & In which situations are the stages of DAG skipped?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83