Here's a part of a Scala recursion that produces a stack overflow after many recursions. I've tried many things but I've not managed to solve it.
val eRDD: RDD[(Int,Int)] = ...
val oldRDD: RDD[Int,Int]= ...
val result = **Algorithm**(eRDD,oldRDD)
Algorithm(eRDD: RDD[(Int,Int)] , oldRDD: RDD[(Int,Int)]) : RDD[(Int,Int)]{
val newRDD = Transformation(eRDD,oldRDD)//only transformations
if(**Compare**(oldRDD,newRDD)) //Compare has the "take" action!!
return **Algorithm**(eRDD,newRDD)
else
return newRDD
}
The above code is recursive and performs many iterations (until the compare returns false)
After some iterations I get a stack overflow error. Probably the lineage chain has become too long. Is there any way to solve this problem? (persist/unpersist, checkpoint, sc.saveAsObjectFile).
Note1: Only compare function performs Actions on RDDs
Note2: I tried some combinations of persist/unpersist but none of them worked!
I tried checkpointing from spark.streaming
. I put a checkpoint at every recursion but still received an overflow error
I also tried using sc.saveAsObjectFile
per iteration and then reading from file (sc.objectFile
) during the next iteration. Unfortunately I noticed that the folders are created per iteration are increasing while I was expecting from them to have equal size per iteration.
Here's how I tried to solve it using "sc.saveAsObjectFile":
Algorithm(eRDD: RDD[(Int,Int)] , recursion:Int) : RDD[(Int,Int)] = {
val filename="RDD"+recursion.toString+".txt"
val oldRDD :RDD[(Int,Int)] = sc.objectFile(filename)
val newRDD=Transform(eRDD,oldRDD)
if (compare(newRDD, oldRDD)){
val newfilename="RDD"+(recursion+1).toString+".txt"
newRDD.saveAsObjectFile(newfilename)
return Algorithm(eRDD,recursion + 1)
}
else
return newRDD
}
After this try the objects Files have the following increasing trend: RDD1.txt has 1 partition (partition00000) RDD2.txt has 4 partitions (partition00000...partition00003) . . RDD6.txt has 64 partitions(partition00000.... ....partition00063) . . e.t.c