1

Here's a part of a Scala recursion that produces a stack overflow after many recursions. I've tried many things but I've not managed to solve it.

val eRDD: RDD[(Int,Int)] = ... 

val oldRDD: RDD[Int,Int]= ...

val result = **Algorithm**(eRDD,oldRDD)


Algorithm(eRDD: RDD[(Int,Int)] , oldRDD: RDD[(Int,Int)]) : RDD[(Int,Int)]{

    val newRDD = Transformation(eRDD,oldRDD)//only transformations

    if(**Compare**(oldRDD,newRDD)) //Compare has the "take" action!!

          return **Algorithm**(eRDD,newRDD)

    else

         return newRDD
}

The above code is recursive and performs many iterations (until the compare returns false)

After some iterations I get a stack overflow error. Probably the lineage chain has become too long. Is there any way to solve this problem? (persist/unpersist, checkpoint, sc.saveAsObjectFile).

Note1: Only compare function performs Actions on RDDs

Note2: I tried some combinations of persist/unpersist but none of them worked!

I tried checkpointing from spark.streaming. I put a checkpoint at every recursion but still received an overflow error

I also tried using sc.saveAsObjectFile per iteration and then reading from file (sc.objectFile) during the next iteration. Unfortunately I noticed that the folders are created per iteration are increasing while I was expecting from them to have equal size per iteration. Here's how I tried to solve it using "sc.saveAsObjectFile":

Algorithm(eRDD: RDD[(Int,Int)] , recursion:Int) : RDD[(Int,Int)] = {

    val filename="RDD"+recursion.toString+".txt"

    val oldRDD :RDD[(Int,Int)] = sc.objectFile(filename)

    val newRDD=Transform(eRDD,oldRDD)

    if (compare(newRDD, oldRDD)){

          val newfilename="RDD"+(recursion+1).toString+".txt"

          newRDD.saveAsObjectFile(newfilename)

          return Algorithm(eRDD,recursion + 1)
    }

    else

          return newRDD
}

After this try the objects Files have the following increasing trend: RDD1.txt has 1 partition (partition00000) RDD2.txt has 4 partitions (partition00000...partition00003) . . RDD6.txt has 64 partitions(partition00000.... ....partition00063) . . e.t.c

P. Str
  • 580
  • 1
  • 5
  • 18
  • 1
    _noticed that the folders are created per iteration are increasing while I was expecting from them to have equal size per iteration._ - looks like a good place to start looking for the problems... – zero323 Oct 30 '15 at 20:59
  • Thanks for helping me zero323 ... I've added more details to my question. – P. Str Oct 30 '15 at 22:03
  • Yes! You solved my problem! I'm amazed by your detailed answer in the other post. Keep sharing your in-depth knowledge. So, maybe the problem is not too the "long lineage at all"...At every recursion stack loads bigger and bigger amounts of data, right? – P. Str Oct 31 '15 at 07:29
  • At first glance it looks like your code is tail-call optimized so there should be no recursion stack and everything should be converted to loop. You can add `@tailrec` annotation to be sure. Amount of data should depend only on your algorithm and number of partitions doesn't matter, but bookkeeping required to manage the partitions is not cheap. Not to mention that `RDD.partitions` returns an Array so its size limited by `Integer.MAX_VALUE` so if amount of partitions grows exponentially you hit the limit in 30 iterations or so :) – zero323 Oct 31 '15 at 12:15
  • After many tests the problem seems to be solved. spark.default.parallelism defines the number of partitions that are to be saved to disk as input for the next iteration. But in one of my experiments with a 3-million rows dataset I noticed a strange thing: When I set spark.default.parallelism to 1000 the algorithm concludes after 192 iterations but when I set it to 16 (as many as my cores) the algorithm performs more than 700 iterrations!! – P. Str Dec 17 '15 at 17:04
  • I've seen that question but without [MCVE](http://stackoverflow.com /help/mcve) is simply not answerable. – zero323 Dec 17 '15 at 17:26

0 Answers0