2

NESTED PARALLELIZATIONS?

Let's say I am trying to do the equivalent of "nested for loops" in Spark. Something like in a regular language, let's say I have a routine in the inside loop that estimates Pi the way the Pi Average Spark example does (see Estimating Pi)

i = 1000; j = 10^6; counter = 0.0;

for ( int i =0; i < iLimit; i++)
    for ( int j=0; j < jLimit ; j++)
        counter += PiEstimator();

estimateOfAllAverages = counter / i;

Can I nest parallelize calls in Spark? I am trying and have not worked out the kinks yet. Would be happy to post errors and code but I think I am asking a more conceptual question about whether this is the right approach in Spark.

I can already parallelize a single Spark Example / Pi Estimate, now I want to do that 1000 times to see if it converges on Pi. (This relates to a larger problem we are trying to solve, if something closer to MVCE is needed I'd be happy to add )

BOTTOM LINE QUESTION I just need someone to answer directly: Is this the right approach, to use nested parallelize calls? If not please advise something specific, thanks! Here's a pseudo-code approach of what I think will be the right approach:

// use accumulator to keep track of each Pi Estimate result

sparkContext.parallelize(arrayOf1000, slices).map{ Function call

     sparkContext.parallelize(arrayOf10^6, slices).map{
            // do the 10^6 thing here and update accumulator with each result
    }
}

// take average of accumulator to see if all 1000 Pi estimates converge on Pi

BACKGROUND: I had asked this question and got a general answer but it did not lead to a solution, after some waffling I decided to post a new question with a different characterization. I also tried to ask this on the Spark User maillist but no dice there either. Thanks in advance for any help.

Community
  • 1
  • 1
JimLohse
  • 1,209
  • 4
  • 19
  • 44
  • Not hearing back from the two who provided answers (which did clarify that no, my approach was wrong) but did not provide usable alternatives. I did hear back from someone in the local community and have posted answer based on that. – JimLohse Dec 20 '15 at 16:14

3 Answers3

5

This is not even possible as SparkContext is not serializable. If you want a nested for loop, then your best option is to use cartesian

val nestedForRDD = rdd1.cartesian(rdd2)
nestedForRDD.map((rdd1TypeVal, rdd2TypeVal) => {
  //Do your inner-nested evaluation code here
})

Keep in mind, just as a double for loop, this comes at a size cost.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • Thanks I had not thought of the fact that parallelize is not serializable, very helpful! I had thought of cartesian but I don't actually need to build the cartesian set, I just need to collect the results from each Pi estimation. Would not Cartesian have a big overhead? And yes our ultimate analysis is in genetics and will be on the order of 10^13 calculations. – JimLohse Dec 17 '15 at 16:54
  • Also I am just learning, if I have this right, that because Spark RDDs effectively can't have more than MAXINT elements, looks like 2.147 x 10^9 would preclude a cartesian that leads to a RDD with more elements than MAXINT. Still nailing that limit down, but it appears to be whatever a Scala collection can hold. Will update this if I learn different. – JimLohse Dec 20 '15 at 23:42
2

In the Pi example, in the nested for loop you can get the same answer by doing a single loop through the process i * j times and summing over all of them and then dividing by j at the end. If you have steps that you want to apply in the outer loop, do them within the loop, but create different groups by assigning specific keys to each inner-loop group. Without knowing what kinds of things you want to do in the outer loop its hard to give an example here.

For the simple case of just averaging to improve convergence, its relatively easy. Instead of doing the nested loop, just make an rdd with i * j elements and then apply the function to each element.

this might look like (with pySpark ): ( f is whatever function you want to apply, remember that it will pass each element in the RDD so define your f with an input even if you don't use it in your function)

x = RandomRDDs.uniformRDD(sc, i*j)
function_values = x.map(f)

from operator import add   
sum_of_values = function_values.reduce(add)
averaged_value = sum_of_values/j (if you are only averaging over the outer loop)

If you want perform actions in the outer loop, I'd assign an index (zipWIthIndex) then create a key using the index modulo j. Then each different key would be a single virtual inner loop cycle and you can use operators like aggregateByKey, foldByKey, or reduceByKey to perform actions only on those records. This will probably take a bit of a performance hit if the different keys are distributed to different partitions.

An alternative would be to repartition the rdd onto j partitions and then use a foreachPartition function to apply a function to each partition.

A third option would be to run the inner loop j times in parallel, concatenate the results into one distributed file, and then do the outer loop operations after reading this into Spark.

JimLohse
  • 1,209
  • 4
  • 19
  • 44
1

No. You can't.

SparkContext is only accessible from the spark Driver node. The inner parallelization() calls would try to execute SparkContext from the worker nodes, which do not have access to SparkContext.

Mario
  • 1,661
  • 13
  • 22
  • This clarification really helpful and perhaps so obvious that it's not spelled out anywhere I have read. Thanks, I was not seeing that. I am still looking for a way to do this, the other answer suggested cartesian but I don't need the full set, just to get the results of each PiAverage and see if they converge. Any ideas on how I do accomplish what I want? – JimLohse Dec 17 '15 at 17:52