I have an iterative application running on Spark that I simplified to the following code:
var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue
var iteration: Int = 0
while (c > 0) {
iteration += 1
// Manipulate the RDD and cache the new RDD
anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
// Actually compute the RDD and spawn a new job
c = anRDD.count()
println(s"Iteration: $iteration, Values: $c")
}
What happens to the memory allocation within consequent jobs?
- Does the current
anRDD
"override" the previous ones or are they all kept into memory? In the long run, this can throw some memory exception - Do
localCheckpoint
andcache
have different behaviors? IflocalCheckpoint
is used in place ofcache
, aslocalCheckpoint
truncates the RDD lineage, then I would expect the previous RDDs to be overridden