24

Can any one please correct my understanding on persisting by Spark.

If we have performed a cache() on an RDD its value is cached only on those nodes where actually RDD was computed initially. Meaning, If there is a cluster of 100 Nodes, and RDD is computed in partitions of first and second nodes. If we cached this RDD, then Spark is going to cache its value only in first or second worker nodes. So when this Spark application is trying to use this RDD in later stages, then Spark driver has to get the value from first/second nodes.

Am I correct?

(OR)

Is it something that the RDD value is persisted in driver memory and not on nodes ?

meow
  • 307
  • 4
  • 16
Ramesh
  • 1,563
  • 9
  • 25
  • 39

4 Answers4

15

Change this:

then Spark is going to cache its value only in first or second worker nodes.

to this:

then Spark is going to cache its value only in first and second worker nodes.

and...Yes correct!

Spark tries to minimize the memory usage (and we love it for that!), so it won't make any unnecessary memory loads, since it evaluates every statement lazily, i.e. it won't do any actual work on any transformation, it will wait for an action to happen, which leaves no choice to Spark, than to do the actual work (read the file, communicate the data to the network, do the computation, collect the result back to the driver, for example..).

You see, we don't want to cache everything, unless we really can to (that is that the memory capacity allows for it (yes, we can ask for more memory in the executors or/and the driver, but sometimes our cluster just doesn't have the resources, really common when we handle big data) and it really makes sense, i.e. that the cached RDD is going to be used again and again (so caching it will speedup the execution of our job).

That's why you want to unpersist() your RDD, when you no longer need it...! :)

Check this image, is from one of my jobs, where I had requested 100 executors, however the Executors tab displayed 101, i.e. 100 slaves/workers and one master/driver:

enter image description here

gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • 5
    I think i have to put my question this way. When we choose RDD.cache().... Is Spark going to cache the RDD in driver memory or executor memory ? – Ramesh Aug 29 '16 at 15:10
  • 6
    @Ramesh in the executors that use it. Imagine the driver having to cache my RDD, which holds data of 15T..It would be a catastrophe! ;) – gsamaras Aug 29 '16 at 17:18
  • @gsamaras Thanks for the answer. If the RDD is initially computed in 100 executors and we cache it, does it mean those 100 executors are occupied and can no longer used by any other Spark app? And if we want to allow other app to run tasks in those, we have to do `.unpersist`? Why holding (i.e. `cache`) some memory resource would block other apps to leverage cpu compute resource? Isn't this an unwise strategy? Thanks – jack Dec 22 '20 at 15:55
0

RDD.cache is a lazy operation. it does nothing until unless you call an action like count. Once you call the action the operation will use the cache. It will just take the data from the cache and do the operation.

RDD.cache- Persists the RDD with default storage level (Memory only). Spark RDD API

2.Is it something that the RDD value is persisted in driver memory and not on nodes ?

RDD can be persisted to disk and Memory as well . Click on the link to Spark document for all the option Spark Rdd Persist

Indrajit Swain
  • 1,505
  • 1
  • 15
  • 22
0
# no actual caching at the end of this statement
rdd1=sc.read('myfile.json').rdd.map(lambda row: myfunc(row)).cache()

# again, no actual caching yet, because Spark is lazy, and won't evaluate anything unless
# a reduction op
rdd2=rdd2.map(mysecondfunc)

# caching is done on this reduce operation. Result of rdd1 will be cached in the memory of each worker node
n=rdd1.count()

So to answer your question

If we have performed a cache() on an RDD its value is cached only on those nodes where actually RDD was computed initially

The only possibility of caching something is on worker nodes, and not on driver nodes.

cache function can only be applied to an RDD (refer), and since RDD only exists on the worker node's memory (Resilient Distributed Datasets!), it's results are cached in the respective worker node memory. Once you apply an operation like count which brings back the result to the driver, it's not really an RDD anymore, it's merely a result of computation done RDD by the worker nodes in their respective memories

Since cache in the above example was called on rdd2 which is still on multiple worker nodes, the caching only happens on the worker node's memory.

In the above example, when do some map-red op on rdd1 again, it won't read the JSON again, because it was cached

FYI, I am using the word memory based on the assumption that the caching level is set to MEMORY_ONLY. Of course, if that level is changed to others, Spark will cache to either memory or storage based on the setting

meow
  • 307
  • 4
  • 16
-1

Here is an excellent answer on caching

(Why) do we need to call cache or persist on a RDD

Basically caching stores the RDD in the memory / disk (based on persistence level set) of that node, so that the when this RDD is called again it does not need to recompute its lineage (lineage - Set of prior transformations executed to be in the current state).

Community
  • 1
  • 1
Krishna Kalyan
  • 1,672
  • 2
  • 20
  • 43
  • The reason for me to ask this question is.. if i persist(memory only) an RDD, Spark will cache it in memory, for this to perform Spark will take 3 to 4 times of memory than the RDD size. – Ramesh Aug 29 '16 at 00:58
  • For example if there is a RDD of 300MB file and if we persist it, Spark will use almost a 1GB of memory to persist it. So because of this, sometimes it throws the memory exception Java Heap size issues. To fix it, i think we need to allocate appropriate memory to driver memory, that is more than 1GB explicitly. So that is the reason am asking caching/persisting is it direct impact on driver memory. – Ramesh Aug 29 '16 at 01:07
  • 1
    When i said "Is it something that the RDD value is persisted in driver memory and not on nodes ? " I mean to say "Is Spark going to cache the value in driver memory or worker node memory(RAM)" ? – Ramesh Aug 29 '16 at 14:41
  • 5
    I think i can simply put my question this way. When we choose RDD.cache().... Is Spark going to cache the RDD in driver memory or executor memory ? – Ramesh Aug 29 '16 at 15:09