6

I want an action in RDD performance like reduce but don't need the operator be commutative. i.e. I want the result in follow will always be "123456789".

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val result = rdd.someAction{ _+_ }

Firstly, I found the fold. The doc of RDD#fold says:

def fold(zeroValue: T)(op: (T, T) ⇒ T): T Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value"

Note that there is no commutative needed in the doc. However, the result is not as expected:

scala> rdd.fold(""){ _+_ }
res10: String = 312456879

EDIT I have tried as mentioned by @dk14, with no luck:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359
Community
  • 1
  • 1
Eastsun
  • 18,526
  • 6
  • 57
  • 81
  • You missed the next section of the docs, which describes what you're seeing: *"This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection."* – Yuval Itzchakov Aug 05 '16 at 06:46

2 Answers2

2

There is no built-in reducing action that satisfies this criteria in Scala, but you can easily implement your own by combining mapPartitions, collect and local reductions:

import scala.reflect.ClassTag

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
  rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}

Using combination of collect and reduce for merge instead of asynchronous and unordered method used by fold ensures that the global order is preserved.

This of course comes with some additional cost including:

  • slightly higher memory footprint on the driver.
  • significantly higher latency - we explicitly wait for all tasks to finish before we start local reduction.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks for your help, does this mean that every partition **is always a continuous sub sequence** of the whole RDD? Is there any document mentioned that? – Eastsun Aug 06 '16 at 03:31
  • Regarding docs - none that I am aware of. It is more or less constrained by the model and contracts of some ordered methods though. The real problem in Spark is how to determine overall sequence. In general there are two cases when you reason about the order a) when you use explicit sort (by contract) b) when you have input which generates deterministic ordered splits and there are no shuffles and other data movements between input and the current point. – zero323 Aug 06 '16 at 10:44
1

As pointed out by @YuvalItzchakov fold does not preserve ordering in a partitioned RDD when combining results. To illustrate this consider coalescing the original RDD to one only partition,

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1)
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res4: String = 123456789

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res5: String = 123456789

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res6: String = 123456789
elm
  • 20,117
  • 14
  • 67
  • 113