6

I would like to create an RDD to collect the results of an iterative calculation .

How can I use a loop (or any alternative) to replace the following code:

import org.apache.spark.mllib.random.RandomRDDs._    

val n = 10 

val step1 = normalRDD(sc, n, seed = 1 ) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong ) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong ) 
val result2 = result1.zip(step3) 

...

val step50 = normalRDD(sc, n, seed = (step49.max).toLong ) 
val result49 = result48.zip(step50) 

(creating the N step RDDs and zipping then together at the end would also be ok as long the 50 RDDs are created iteratively to respect the seed = (step(n-1).max) condition)

zero323
  • 322,348
  • 103
  • 959
  • 935
ulrich
  • 3,547
  • 5
  • 35
  • 49
  • I'd use `Stream.unfold` from scalaz to generate a stream of steps, and then zip it with itself and/or scanRight.. – Reactormonk Feb 18 '16 at 10:39

1 Answers1

6

A recursive function would work:

/**
 * The return type is an Option to handle the case of a user specifying
 * a non positive number of steps.
 */
def createZippedNormal(sc : SparkContext, 
                       numPartitions : Int, 
                       numSteps : Int) : Option[RDD[Double]] = {

  @scala.annotation.tailrec   
  def accum(sc : SparkContext, 
            numPartitions : Int, 
            numSteps : Int, 
            currRDD : RDD[Double], 
            seed : Long) : RDD[Double] = {
    if(numSteps <= 0) currRDD
    else {
      val newRDD = normalRDD(sc, numPartitions, seed) 
      accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max)
    }
  }

  if(numSteps <= 0) None
  else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L))
}
Ramón J Romero y Vigil
  • 17,373
  • 7
  • 77
  • 125
  • Tail recursion won't protect you from RDD lineage blowing the stack :) – zero323 Feb 20 '16 at 15:43
  • @zero323 Agreed. However, this issue is inherent with the requirements of the question. Any answer would suffer a similar problem. – Ramón J Romero y Vigil Feb 20 '16 at 15:46
  • Just wanted to point out that you're building a recursive data structure behind the scenes which won't be tail optimized. Nothing more :) And actually you can solve it and avoid the problem by using checkpoints. It is is even solvable without a single zip :) – zero323 Feb 20 '16 at 15:59
  • Nah, I've already answered one similar question by OP ;) Regarding lineage see http://stackoverflow.com/a/34462261/1560062. +1 for this one. – zero323 Feb 20 '16 at 16:07