6

I am little confused with the caching mechanism of Spark.

Let's say I have a Spark application with only one action at the end of multiple transformations. In which suppose I have a dataframe A and I applied 2-3 transformation on it, creating multiple dataframes which eventually helps creating a last dataframe which is going to be saved to disk.

example :

val A=spark.read() // large size
val B=A.map()
val C=A.map()
.
.
.
val D=B.join(C)
D.save()

So do I need to cache dataframe A for performance enhancement?

Thanks in advance.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
ganesh_patil
  • 356
  • 3
  • 18
  • I believe caching is not needed for single action. You will not gain any benefit since caching will happen when any action is performed. Since you have one action you will never use that cache. – Ramdev Sharma Dec 04 '19 at 18:22
  • @RamdevSharma Belief is not always factual... – thebluephantom Dec 04 '19 at 19:51
  • @thebluephantom , per my work on Spark, caching didn't help for single action. You will get advantage in further actions if any. Sometimes we do call count function on dataframe to let it cache where count is an action. This gives advantage to final action that will be executing with various transformation. – Ramdev Sharma Dec 04 '19 at 21:48
  • I do not agree, here there is advantage. See start of question here, https://stackoverflow.com/questions/29903675/understanding-sparks-caching – thebluephantom Dec 04 '19 at 22:22
  • @thebluephantom This is the same thing I have mentioned caching will help to subsequent action only but not if only one action involved. Since caching will happen on first action. In thread, it is clearly mentioned that cache will happen on first save action. Current question is whether it is good idea to use caching when only single action. So in short, It will not help. – Ramdev Sharma Dec 04 '19 at 22:52
  • @RamdevSharma I find it odd, I seem to have read things elsewhere differently. – thebluephantom Dec 05 '19 at 15:03
  • Actually I am right, you are wrong, I am reposting the answer. @RamdevSharma – thebluephantom Dec 05 '19 at 16:54
  • May be an idea to select an answer... – thebluephantom Dec 08 '19 at 08:48

2 Answers2

3

I think thebluephantom's answer is right.

I had faced same situation with you until today, and i also found answers only saying Spark cache() does not work on single query. And also my spark job executing single query seems not caching.

Because of them, i was also doubted for he's answer.
But he showed evidences for cache is working with a green box even he execute single query.

So, i tested 3 cases with dataframe(not RDD) like below and the results seems he is right.
And execution plan is also changed (more simple and use InMemoryRelation, please see the below).

  1. without cache
  2. using cache
  3. using cache with calling unpersist before action

without cache

example

val A = spark.read.format().load()

val B = A.where(cond1).select(columns1)
val C = A.where(cond2).select(columns2)

val D = B.join(C)

D.save()

DAG for my case

enter image description here

This is a bit more complicated than example.

This DAG is messy even though there is no complicated execution. And you can see the scan is occured 4 times.

enter image description here

with cache

example

val A = spark.read.format().load().cache()  // cache will be working

val B = A.where(cond1).select(columns1)
val C = A.where(cond2).select(columns2)

val D = B.join(C)

D.save()

This will cache A, even single query.
You can see DAG that read InMemoryTableScan twice.

DAG for my case

enter image description here

with cache and unpersist before action

val A = spark.read.format().load().cache()

val B = A.where(cond1).select(columns1)
val C = A.where(cond2).select(columns2)

/* I thought A will not be needed anymore */
A.unpersist()

val D = B.join(C)

D.save()

This code will not cache A dataframe, because it was unset cache flag before starting action. (D.save()) So, this will result in exactly same with first case (without cache).

Important thing is unpersist() must be written after action(after D.save()). But when i ask some people in my company, many of them used like case 3 and didn't know about this.

I think that's why many people misunderstand cache is not working on single query.

cache and unpersist should be like below

val A = spark.read.format().load().cache()

val B = A.where(cond1).select(columns1)
val C = A.where(cond2).select(columns2)

val D = B.join(C)

D.save()

/* unpersist must be after action */
A.unpersist()

This result exactly same with case 2 (with cache, but unpersist after D.save()).

So. I suggest try cache like thebluephantom's answer.
If i present any incorrection. please note that.

Thanks to thebluephantom's for solving my problem.

Insung Park
  • 401
  • 3
  • 15
  • I think Insung Park cleared my doubts with his dag. also Thanks to @thebluephantom for helping me on this. – ganesh_patil Feb 25 '20 at 09:30
  • Insung Park : Any reason why spark has to scan the data 4 times when we are not using Cache? From the example it seems like for each of the dataframes (A,B,C,D) that is getting scanned? it is scanning source data all again? @thebluephantom – Despicable me Dec 21 '21 at 06:39
  • 1
    In my opinion, If spark caches by default, then data on same path will not change even data was changed(overwritten) on REPL or Notebooks like zeppelin. But I cannot say that is reason. – Insung Park Dec 21 '21 at 07:52
  • It does makes sense not to apply cache by default. However scanning behavior is either weird or under documented!! – Despicable me Dec 21 '21 at 09:05
  • An addition to this answer is that if you need `D` later on in your code, you shouldn't unpersist `A` after saving `D`. If you do, Spark will recompute `D` the next time you will need it (yes, even if you cached it, which may be counter-intuitive at first, see [here](https://stackoverflow.com/questions/69254276/in-spark-is-there-any-way-to-unpersist-a-dataframe-rdd-in-the-middle-of-executi)). – leleogere Jul 27 '22 at 12:18
2

Yes, you are correct.

You should cache A as it used for B & C as input. The DAG visualization would show the extent of reuse or going back to source (in this case). If you have a noisy cluster, some spilling to disk could occur.

See also top answer here (Why) do we need to call cache or persist on a RDD

However, I was looking for skipped stages, silly me. But something else shows as per below.

The following code akin to your own code:

val aa = spark.sparkContext.textFile("/FileStore/tables/filter_words.txt")//.cache
val a = aa.flatMap(x => x.split(" ")).map(_.trim) 
val b=a.map(x => (x,1)) 
val c=a.map(x => (x,2)) 
val d=b.join(c)
d.count

Looking at UI with .cache

enter image description here

and without .cache

enter image description here

QED: SO, .cache has benefit. It would not make sense otherwise. Also, 2 reads could lead to different results in some cases.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • It is not very clear about benefit that you got when executing count action. Did you see any difference in execution time in result? – Ramdev Sharma Dec 05 '19 at 18:43
  • Sample too small but think large spin up emr cluster and try – thebluephantom Dec 05 '19 at 18:55
  • its not about the count, but the maps – thebluephantom Dec 06 '19 at 17:32
  • thanks for your thoughts , but I'm still confused which one is true , so cant accept your answer yet. – ganesh_patil Dec 13 '19 at 10:15
  • Well, the other answer was removed. And I show 2 options. – thebluephantom Dec 13 '19 at 10:49
  • And there is no other answer disproving. – thebluephantom Dec 13 '19 at 12:54
  • Less confused now? – thebluephantom Dec 26 '19 at 11:21
  • Hi, I think the caching does not help with only one action in code. I tried to run it with caching and without caching and there is no significant time difference, I think when a single action called spark reads the file in memory and apply the transformation on it in parallel manner , in our case its two transformation means it will create two dataframe and then store the join result. If out program has a second action and it uses the same dataframe , then it will read again from the file. Whats your thoughts on this. – ganesh_patil Jan 02 '20 at 09:08
  • Believe what you will. The proof is shown in green. If you think Spark just does things for the hell of it, fine. Good luck in 2020 – thebluephantom Jan 02 '20 at 09:42
  • Can not get why to cache DF when it fits neither in memory nor on disk. E.g. 2 executors with 200 RAM and 500gb disck each And 100Tb dataset. – 0script0 Dec 30 '21 at 17:10