50

I'm trying to understand how Spark's cache work.

Here is my naive understanding, please let me know if I'm missing something:

val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when rdd2 is saved I assume) and then from cache (assuming there is enough RAM) when rdd3 is saved)

Now here is my question. Let's say I want to cache rdd2 and rdd3 as they will both be used later on, but I don't need rdd1 after creating them.

Basically there is duplication, isn't it? Since once rdd2 and rdd3 are calculated, I don't need rdd1 anymore, I should probably unpersist it, right? the question is when?

Will this work? (Option A)

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()

Does spark add the unpersist call to the DAG? or is it done immediately? if it's done immediately, then basically rdd1 will be non cached when I read from rdd2 and rdd3, right?

Should I do it this way instead (Option B)?

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.cache()
rdd3.cache()

rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

rdd1.unpersist()

So the question is this: Is Option A good enough? i.e. will rdd1 still load the file only once? Or do I need to go with Option B?

gleech
  • 305
  • 4
  • 11
Eran Medan
  • 44,555
  • 61
  • 184
  • 276

3 Answers3

39

It would seem that Option B is required. The reason is related to how persist/cache and unpersist are executed by Spark. Since RDD transformations merely build DAG descriptions without execution, in Option A by the time you call unpersist, you still only have job descriptions and not a running execution.

This is relevant because a cache or persist call just adds the RDD to a Map of RDDs that marked themselves to be persisted during job execution. However, unpersist directly tells the blockManager to evict the RDD from storage and removes the reference in the Map of persistent RDDs.

persist function

unpersist function

So you would need to call unpersist after Spark actually executed and stored the RDD with the block manager.

The comments for the RDD.persist method hint towards this: rdd.persist

Rich
  • 2,805
  • 8
  • 42
  • 53
  • 2
    Yep, seems you are on it. This is a little unfortunate, I wish "cache" would have been converted to a DAG operation and not just add the RDD ID to a map... there are plenty of cases where you want to cache something intermediately, create a new RDD, then drop the old one. Perhaps there are good theoretical reasons on why this is not a good idea though... in any case, the LRU (I assume) ordering of the cache means that the old unused rdd1 will be evicted if rdd2 and rdd3 need that space for caching... – Eran Medan Apr 27 '15 at 20:25
  • So I mostly just looked into what persist/cache and unpersist are doing, but there is still room for considering what Spark is doing when you derive an RDD from another one and how it might optimize anyway. I'm not sure that `rdd1` even needs to be cached, it may be checkpointed by `rdd2` and `rdd3` when they are cached or when the DAG is pipelined. This is more of a gray area for me though. – Rich Apr 27 '15 at 20:56
  • 4
    Did a little bit more investigation and tracing through debugger. `rdd2` and `rdd3` will reference `rdd1` as a dependency. `rdd1` will load its data into partitions once on the first action executed. Now at this point `rdd2` and `rdd3` both apply their transformations to the data already loaded by `rdd1` in the partitions. I believe that the caching provides value if you run multiple actions on the same exact RDD, but in this case of new branching RDDs I don't think you run into the same issue because I believe Spark is aware that `rdd1` is still a dependency for `rdd3` after the first save. – Rich Apr 27 '15 at 21:35
3

Option B is an optimal approach with small tweak-in. Make use of less expensive action methods. In the approach mentioned by your code, saveAsTextFile is an expensive operation, replace it by count method.

Idea here is to remove the big rdd1 from DAG, if it's not relevant for further computation (after rdd2 and rdd3 are created)

Updated approach from code

val rdd1 = sc.textFile("some data").cache()
val rdd2 = rdd1.filter(...).cache() 
val rdd3 = rdd1.map(...).cache()

rdd2.count
rdd3.count

rdd1.unpersist()
Shivaprasad
  • 167
  • 1
  • 9
  • 2
    isn't it not effective way/practice to call an action JUST to make transformations really occur? @Shivaprasad – SarahData Feb 21 '19 at 09:53
  • @SarahData I agree, this seems contrary to Spark's lazy evaluation ethos, but I think the point is that we need to trigger a transformation in order for the data to be moved into memory. Otherwise you're undoing the cache/persist before any data has been moved thus negating any benefits of caching I think. – Adam Conrad Jun 08 '23 at 15:35
  • This is a clear demonstration of the need for an action for caching to take place: https://kb.databricks.com/scala/best-practice-cache-count-take – Adam Conrad Jun 08 '23 at 16:58
2

In option A, you have not shown when you are calling the action (call to save)

val rdd1 = sc.textFile("some data")
rdd.cache() //marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

If the sequence is as above, Option A should use cached version of rdd1 for computing both rdd2 and rdd 3

ayan guha
  • 1,249
  • 10
  • 7
  • It should, I agree, but would it? I think it wouldn't, as when you call rdd2.saveAsTestFile etc, rdd1 is already marked as not persisted. the persist / unpersist is not on the DAG – Eran Medan Apr 28 '15 at 16:53
  • 1
    until you call saveAsFile, nothing **really** happens.....so my point is the order of rdd1.unpersist call does not matter if rdd2 is already cached – ayan guha Apr 29 '15 at 00:45