11

Below I have a Scala example of a Spark fold action:

val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1.fold(5)(_ + _)

This produces the output 35. Can somebody explain in detail how this output gets computed?

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
thedevd
  • 683
  • 11
  • 26
  • You know.... The documentation explicitly specifies it as `zeroValue` and not `initialValue`. And `5` does not look like `zero`. – sarveshseri Jan 20 '18 at 18:14

3 Answers3

10

Taken from the Scaladocs here (emphasis mine):

@param zeroValue the initial value for the accumulated result of each partition for the op operator, and also the initial value for the combine results from different partitions for the op operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)

The zeroValue is in your case added four times (one for each partition, plus one when combining the results from the partitions). So the result is:

(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5) + 5 // (extra one for combining results)
Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
SCouto
  • 7,808
  • 5
  • 32
  • 49
  • 3
    I got it (extra one for combining results, thanks for the answer. But what is the significant of fold action I mean which circumstances, I should go for fold rather than reduce – thedevd Jan 20 '18 at 18:14
  • 1
    Well, basically fold gives you the option of providing an initialValue while reduce does not. – SCouto Jan 20 '18 at 18:37
  • 1
    @thedevd [Why is the fold action necessary in Spark?](https://stackoverflow.com/q/34529953/8371915) – Alper t. Turker Jan 20 '18 at 20:27
4

zeroValue is added once for each partition and should a neutral element - in case of + it should be 0. The exact result will depend on the number of partitions but it is equivalent to:

rdd1.mapPartitions(iter => Iterator(iter.foldLeft(zeroValue)(_ + _))).reduce(_ + _)

so:

val rdd1 = sc.parallelize(List(1,2,3,4,5),3)

distributes data as:

scala> rdd1.glom.collect
res1: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5))

and a whole expression is equivalent to:

(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5)

plus 5 for jobResult.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
4

You know that Spark RDD's perform distributed computations.

So, this line here,

val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)

tells Spark that it needs to support 3 partitions in this RDD and that will enable it to run computations using 3 independent executors in parallel.

Now, this line here,

rdd1.fold(5)(_ + _)

tells spark to fold all those partitions using 5 as initial value and then fold all these partition results from 3 executors again with 5 as initial value.

A normal Scala equivalent is can be written as,

val list = List(1, 2, 3, 4, 5)
val listOfList = list.grouped(2).toList
val listOfFolds = listOfList.map(l => l.fold(5)(_ + _))
val fold = listOfFolds.fold(5)(_ + _)

So... if you are using fold on RDD's you need to provide a zero value.

But then you will ask - why or when someone will use fold instead of reduce?

Your confusion lies in you perception of zero value. The thing is that this zero value for RDD[T] does not entirely depend on our type T but also on the nature of computation. So your zero value does not need to be 0.

Lets consider a simple example where we want to calculate "largest number greater than 15" or "15" in our RDD,

Can we do that using reduce? The answer is NO. But we can do it using fold.

val n15GT15 = rdd1.fold(15)({ case (acc, i) => Math.max(acc, i) })
sarveshseri
  • 13,738
  • 28
  • 47
  • 1
    If I understand your example correctly, dismissing `reduce` isn't entirely correct: `list.reduce{ (acc, x) => val m = math.max(acc, x); if (m > 15) m else 15 }` – Leo C Jan 20 '18 at 20:48
  • Yes. But then you are doing that extra `if` check in your `combine` function which could have been avoided if you used `fold`. – sarveshseri Jan 20 '18 at 20:53
  • Agreed that `fold` is a better choice in your example. My point is that it can be done with `reduce`. – Leo C Jan 20 '18 at 20:59
  • Not always, it just worked out in this example. In some cases it may be doable but the cost will be significantly high and in some cases it will not be possible. every `reduce` can be done with `fold` but not the other way around. – sarveshseri Jan 20 '18 at 21:25
  • Maybe I wasn't clear. I simply wanted to point out that your example isn't a good case for justifying your `Can we do that with reduce? NO` statement. – Leo C Jan 20 '18 at 22:45