1

Several sources describe RDDs as ephemeral by default (e.g., this s/o answer) -- meaning that they do not stay in memory unless we call cache() or persist() on them.

So let's say our program involves an ephemeral (not explicitly cached by the user) RDD that is used in a few operations that cause the RDD to materialize. My question is: does Spark discard the materialized ephemeral RDD immediately -- or is it possible that the RDD stays in memory for other operations, even if we never asked for it to be cached?

Also, if an ephemeral RDD stays in memory, is it always only because some LRU policy has not yet kicked it out -- or can it also be because of scheduling optimizations?

I've tried to figure that out with code like that below -- run with Jupyter notebook with python 3.5 and spark 1.6.0, on a 4-core machine -- but I would appreciate an answer by someone who knows for sure.

import pyspark
sc = pyspark.SparkContext()
N = 1000000   # size of dataset
THRESHOLD = 100  # some constant

def f():
    """ do not chache """
    rdd = sc.parallelize(range(N))
    for i in range(10):
        print(rdd.filter(lambda x: x > i * THRESHOLD).count())

def g():
    """ cache """
    rdd = sc.parallelize(range(N)).cache()
    for i in range(10):
        print(rdd.filter(lambda x: x > i * THRESHOLD).count())

For the two functions above, f() does not ask the rdd to persist - but g() does, at the beginning. When I time the two functions, foo() and boo(), I get very comparable performance for the two, as if the cache() call has made no difference. (In fact, the one that uses caching is slower).

%%timeit
f()
> 1 loops, best of 3: 2.19 s per loop

%%timeit
g()
> 1 loops, best of 3: 2.7 s per loop

Actually, even modifying f() to call unpersist() on the RDD does not change things.

def ff():
    """ modified f() with explicit call to unpersist() """
  rdd = sc.parallelize(range(N))
  for i in range(10):
    rdd.unpersist()
    print(rdd.filter(lambda x: x > i * THRESHOLD).count())

%%timeit
ff()
> 1 loops, best of 3: 2.25 s per loop

The documentation for unpersist() states that it "mark[s] the RDD as non-persistent, and remove[s] all blocks for it from memory and disk." Is this really so, though - or does Spark ignore the call to unpersist when it knows it's going to use the RDD down the road?

Community
  • 1
  • 1
michalis
  • 98
  • 6

1 Answers1

1

There is simply no value in caching here. Creating RDD from a range is extremely cheap (every partition needs only two integers to get going) and action you apply cannot really benefit from caching. persist is applied on the Java object not a Python one, and your code doesn't perform any work between RDD creation and the first transformation.

Even if you ignore all of that this is a very simple task with tiny data. Total cost is most likely driven by scheduling and communication than anything else.

If you want to see caching in action consider following example:

from pyspark import SparkContext
import time

def f(x):
   time.sleep(1)
    return x

sc = SparkContext("local[5]")
rdd = sc.parallelize(range(50), 5).map(f)
rdd.cache()

%time rdd.count()   # First run, no data cached ~10 s
## CPU times: user 16 ms, sys: 4 ms, total: 20 ms
## Wall time: 11.4 s
## 50

%time rdd.count()  # Second time, task results fetched from cache
## CPU times: user 12 ms, sys: 0 ns, total: 12 ms
## Wall time: 114 ms
## 50

rdd.unpersist()  # Data unpersisted

%time rdd.count()  #  Results recomputed ~10s
## CPU times: user 16 ms, sys: 0 ns, total: 16 ms 
## Wall time: 10.1 s
## 50

While in simple cases like this one persisting behavior is predictable in general caching should be considered a hint not a contract. Task output may be persisted or not depending on available resources and can be evicted from cache without any user intervention.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Hey, thanks for the reply. To clarify, are you saying that spark does discard the ephemeral RDDs -- but the operation I apply is so cheap that I see no difference? – michalis Mar 19 '16 at 13:29
  • There is no such thing as ephemeral RDD because RDD exist only as a small object on the driver. Tasks results are discarded when go out of scope using standard language tools. But ignoring technicalities there is nothing to cache here. At the time you call it your data is basically n `range` objects where n is a number of partitions. – zero323 Mar 19 '16 at 13:35
  • Hi - I'm using the term 'ephemeral' with a particular meaning here - i.e. rdds that the user does not ask to persist. I'm not interested only in the particular example, that's for illustrating my question. My question is when rdds are discarded from the cluster node memory, if we do not ask them to persist. – michalis Mar 19 '16 at 13:47
  • See for example https://stackoverflow.com/questions/34117469/spark-why-do-i-have-to-explicitly-tell-what-to-cache/34117788#34117788 RDD is not data (although we tend to think this way). Worker see only standard iterators. Shuffle files can be preserved (see https://stackoverflow.com/questions/34580662/what-does-stage-skipped-mean-in-apache-spark-web-ui/34581152#34581152) though. But once again in your code __there is nothing to cache__. – zero323 Mar 19 '16 at 13:52
  • 1
    Thanks. I'm probably missing something, but right now I don't see a direct answer to the title question. I'll take a closer look at the posts you link to and try to understand better. – michalis Mar 19 '16 at 14:26
  • I do not believe you answered the question. – thebluephantom Apr 28 '19 at 07:49