3

I have an iterative application running on Spark that I simplified to the following code:

var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue 
var iteration: Int = 0
while (c > 0) {
    iteration += 1
    // Manipulate the RDD and cache the new RDD
    anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
    // Actually compute the RDD and spawn a new job
    c = anRDD.count()
    println(s"Iteration: $iteration, Values: $c")
}

What happens to the memory allocation within consequent jobs?

  • Does the current anRDD "override" the previous ones or are they all kept into memory? In the long run, this can throw some memory exception
  • Do localCheckpoint and cache have different behaviors? If localCheckpoint is used in place of cache, as localCheckpoint truncates the RDD lineage, then I would expect the previous RDDs to be overridden
w4bo
  • 855
  • 7
  • 14

2 Answers2

3

Unfortunately seems that Spark is not good for things like that.

Your original implementation is not viable because on each iteration the newer RDD will have an internal reference to the older one so all RDDs pile up in memory.

localCheckpoint is an approximation of what you are trying to achieve. It does truncate RDD's lineage but you lose fault tolerance. It's clearly stated in the documentation for this method.

checkpoint is also an option. It is safe but it would dump the data to hdfs on each iteration.

Consider redesigning the approach. Such hacks could bite sooner or later.

simpadjo
  • 3,947
  • 1
  • 13
  • 38
2
  1. RDDs are immutable so each transformation will return a new RDD. All anRDD will be kept in memory. See below(running two iteration for your code), id will be different for all the RDDs enter image description here

    So yes, In the long run, this can throw some memory exception. And you should unpersist rdd after you are done processing on it.

  2. localCheckpoint has different use case than cache. It is used to truncate the lineage of RDD. It doesn't store RDD to disk/local It improves performance but decreases fault tolerance in turn.

wypul
  • 807
  • 6
  • 9