2

I found similiar topic: Understanding Spark's caching

but it is still not exact my question. Let's consider below snippets of code: OptionA:

rdd1 = sc.textFile()
rdd1.cache()
rdd2 = rdd1.map().partionBy()
rdd3 = rdd1.reduceBy().map()
rdd2.cache()
rdd1.unpersist()
data = rdd2.collect()

OptionB:

rdd1 = sc.textFile()
rdd1.cache()
rdd2 = rdd1.map().partionBy()
rdd3 = rdd1.reduceBy().map()
rdd2.cache()
data = rdd2.collect()
rdd1.unpersist()

Which option should I choose to prevent recomputing of rdd1? At the first glance, optionA looks ok, but having in mind that operation in spark are lazy I think that doing unpersist before doing action on rdd2 can result in need of recomputing rdd1 once again. On the other hand, calling unpersist, as in optionB can result in no free space to cache rdd2. Please help me to choose which option should I use.

Michocio
  • 503
  • 3
  • 19

1 Answers1

1

Both options are strictly speaking incorrect.

The first one, as you suspect, removes caching flag, before data is actually collected.

The second one actually triggers the cache, but since you never evaluate rdd3 cached rdd1 is just persisted, and just after that discarded. Removing rdd1.cache() should actually improve performance. Also, rdd2.cache() seems to obsolete, since the result is never reused.

If textFile loads data from expensive storage, you could structure your code like this:

rdd1 = sc.textFile(...)
rdd1.cache()

rdd2 = rdd1.map(...).partionBy(...)
rdd3 = rdd1.reduceByKey(...).map(...)

rdd2.someAction()
rdd3.someAction()

rdd1.unpersist()

where someAction is the action you want to perform on a particular RDD.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115