1

Using pyspark/Delta lakes on Databricks, I have the following scenario:

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)

analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here

As I understand Spark with Delta lakes, due to chained execution, result is not actually computed upon declaration, but rather when it is used.

However, in this example, it is used multiple times, and hence the most expensive transformation is computed multiple times.

Is it possible to force execution at some point in the code, e.g.

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??

analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
casparjespersen
  • 3,460
  • 5
  • 38
  • 63
  • [When to cache a DataFrame?](https://stackoverflow.com/questions/44156365/when-to-cache-a-dataframe) and [What is Lineage In Spark?](https://stackoverflow.com/questions/45751113/what-is-lineage-in-spark) – pault Nov 01 '19 at 14:46
  • The question title is flawed. – thebluephantom Nov 01 '19 at 20:53

1 Answers1

0

The question in my view is all over the place, or not clear. But if you are new to Spark then, this can be the case.

So:

For use of .force, see https://blog.knoldus.com/getting-lazy-with-scala/ .force will not work on a Dataset or Dataframe.

Is this anything to do with pyspark or Delta Lake approach? No, no.

analysis_1 = result.groupBy(...).count() # quick smaller transformation?? 
  • This is in fact an Action with Transformations preceding leading to shuffling most likely.

So, I think you mean as our esteemed pault states, the following:

  • .cache or .persist

You would need I suspect:

result.cache 

This would mean your 2nd Action analysis_2 would not need to recompute all the way back to source a shown here

(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43: 
8/8
succeeded / total tasks 
Stage 44: 
200/200
succeeded / total tasks 
Stage 45:   
1/1
succeeded / total tasks 
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46: 
0/8
succeeded / total tasks skipped
Stage 47: 
200/200
succeeded / total tasks 
Stage 48:   
1/1
succeeded / total tasks 

With improvements that are made to Spark, shuffle partitions are kept resulting in skipped stages in some cases as well and in particular for RDDs. For dataframes, caching is required to get the skipped stages effect I observe.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • I am indeed new to Spark, so I'm trying to grasp the various concepts. It is, however, progressing :-) Thank you for the explanation. As @pault hinted, it was the concept of `cache` that I was looking for. Thank you all. – casparjespersen Nov 02 '19 at 20:03