5

I've a silly question involving fold and reduce in PySpark. I understand the difference between these two methods, but, if both need that the applied function is a commutative monoid, I cannot figure out an example in which fold cannot be substituted byreduce`.

Besides, in the PySpark implementation of fold it is used acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)? (this second order sounds more closed to a leftFold to me)

Cheers

Tomas

Community
  • 1
  • 1
Tomas F. Pena
  • 81
  • 1
  • 3

1 Answers1

14

Empty RDD

It cannot be substituted when RDD is empty:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0

You can of course combine reduce with condition on isEmpty but it is rather ugly.

Mutable buffer

Another use case for fold is aggregation with mutable buffer. Consider following RDD:

import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

Lets say we want a sum of all elements. A naive solution is to simply reduce with +:

rdd.reduce(_ + _)

Unfortunately it creates a new vector for each element. Since object creation and subsequent garbage collection is expensive it could be better to use a mutable object. It is not possible with reduce (immutability of RDD doesn't imply immutability of the elements), but can be achieved with fold as follows:

rdd.fold(DenseVector(0))((acc, x) => acc += x)

Zero element is used here as mutable buffer initialized once per partition leaving actual data untouched.

acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)

See SPARK-6416 and SPARK-7683

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks for your answer, but could you please elaborate a little more the mutable buffer example? Is there a similar example in PySpark? – Tomas F. Pena Jan 02 '16 at 19:34
  • Since `zeroElement` is created every time `fold` is called and is not part of the data it can be safely mutated. PySpark is partially immune to possible effects of mutating data inside RDD so it is hard to find a good Python example. It is a detail of implementation not a part of the contract though. – zero323 Jan 02 '16 at 19:49