0

I want to log the number of rows in an RDD halfway between the start and end transformations. My code currently looks like this:

val transformation1 = firstTransformation(inputdata).cache  // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).cache
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))

My problem is that transformation1 is a huge RDD and takes up a lot of memory (it fits in memory but causes memory problems later on). However, I know that since I am performing 2 different operations on tranformation1 (.count() and secondTransformation()) it is normally recommended that it should be cached.

This type of scenario is probably very common, so what is the recommended way of dealing with it? Should you always cache an RDD before an intermediate count, or can I remove the .cache() on transformation1?

B. Smith
  • 1,063
  • 4
  • 14
  • 23

1 Answers1

1

If you are having memory problems you should unpersist as soon as possible and you could also persist on disk.

val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY)  // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY)
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
// All the actions are done
transformation1.unpersist()
transformation2.unpersist()

if you can use unpersist before the memory issues happened it would be better if you cache instead of persist on disk

Mikel San Vicente
  • 3,831
  • 2
  • 21
  • 39
  • When unpersisting, Spark crashes and says `Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout` – B. Smith Nov 21 '17 at 22:52
  • that's a different issue, take a look to https://stackoverflow.com/questions/41123846/scala-spark-dataframes-join-java-util-concurrent-timeoutexception-futures-t – Mikel San Vicente Nov 21 '17 at 22:54