17

In Spark, the groupByKey function transforms a (K,V) pair RDD into a (K,Iterable<V>) pair RDD.

Yet, is this function stable? i.e is the order in the iterable preserved from the original order?

For example, if I originally read a file of the form:

K1;V11
K2;V21
K1;V12

May my iterable for K1 be like (V12, V11) (thus not preserving the original order) or can it only be (V11, V12) (thus preserving the original order)?

Jean Logeart
  • 52,687
  • 11
  • 83
  • 118

2 Answers2

16

No, the order is not preserved. Example in spark-shell:

scala> sc.parallelize(Seq(0->1, 0->2), 2).groupByKey.collect
res0: Array[(Int, Iterable[Int])] = Array((0,ArrayBuffer(2, 1)))

The order is timing dependent, so it can vary between runs. (I got the opposite order on my next run.)

What is happening here? groupByKey works by repartitioning the RDD with a HashPartitioner, so that all values for a key end in up in the same partition. Then it performs the aggregation locally on each partition.

The repartitioning is also called a "shuffle", because the lines of the RDD are redistributed between nodes. The shuffle files are pulled from the other nodes in parallel. The new partition is built from these pieces in the order that they arrive. The data from the slowest source will be at the end of the new partition, and at the end of the list in groupByKey.

(Data pulled from the worker itself is of course fastest. Since there is no network transfer involved here, this data is pulled synchronously, and thus arrives in order. (It seems to, at least.) So to replicate my experiment you need at least 2 Spark workers.)

Source: http://apache-spark-user-list.1001560.n3.nabble.com/Is-shuffle-quot-stable-quot-td7628.html

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • The issue I still have with this is that I feel like parallelize is taking the pairs out of order, and `groupByKey` is still stable, maybe is there away to `parallelize` preserving order – aaronman Jun 16 '14 at 15:50
  • `sc.parallelize(Seq(0->1, 0->2), 2).collect` will always return `0->1, 0->2`. – Daniel Darabos Jun 16 '14 at 15:57
  • Well IDT that means anything, because of lazy execution IDT parallelize actually does anything until you preform an action – aaronman Jun 16 '14 at 16:07
  • You said you feel like `parallelize` is "taking pairs out of order". I'm saying it's absolutely not doing that. Can you show an example of where it builds an RDD with out of order rows? – Daniel Darabos Jun 16 '14 at 17:37
1

Spark (and other map reduce frameworks) sort data by partitioning , and then merging. Since a merge sort is a stable operation I would guess that the result is stable. After looking more into the source I found that if spark.shuffle.spill is true it uses an external sort , merge sort in this case, which is stable. I'm not 100% sure what it does if it's allowed to spill to disk.

From source:

private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

Partitioning is also a stable operation because it does no reordering

aaronman
  • 18,343
  • 7
  • 63
  • 78
  • 1
    The question's not about sorting, it's about whether the original order (which might not be sorted) is preserved. – Travis Brown Jun 13 '14 at 14:04
  • @TravisBrown that's what stable sort means, "preserving original order", I will admit I'm not 100% sure it uses merge sort all the way but it certain;y seems so from the source – aaronman Jun 13 '14 at 14:08
  • Right, but there's no sorting happening here—just partitioning the data by a key. – Travis Brown Jun 13 '14 at 14:10
  • @TravisBrown the data is partitioned by a key which is stable since it's just linearly going over them, and then a merge is performed on data partitioned to the same "reducer" – aaronman Jun 13 '14 at 14:11
  • How do you know partitioning (or "shuffle") is stable? It definitely does reordering. Try `sc.parallelize(Seq(1->1, 2->2, 3->3), 3).partitionBy(new HashPartitioner(3)).collect`. – Daniel Darabos Jun 14 '14 at 00:16
  • @DanielDarabos it's stable until you do collect, because there is no reason for the pairs to come back in any order. I think if you did something like sortByKey they would come back in stable order. basically in your example the collect is doing the reordering no the partitioning step – aaronman Jun 14 '14 at 00:29
  • Nope, `collect` just fetches the contents of the RDD. The point of repartitioning is that the partitioner determines for each key, which partition it goes to. If X was in partition 1 and gets put in partition 2, and Y moves from 2 to 1, now their order is reversed. – Daniel Darabos Jun 14 '14 at 00:32
  • @DanielDarabos the question only asks if after calling groupByKey if the order is stable not after doing collect – aaronman Jun 14 '14 at 00:34
  • Right, if `collect` worked like you say, you would be correct. – Daniel Darabos Jun 14 '14 at 00:36
  • @DanielDarabos not following also just realized you're example shows nothing since a stable sort and non stable would sort it the same – aaronman Jun 14 '14 at 00:38
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/55612/discussion-between-daniel-darabos-and-aaronman). – Daniel Darabos Jun 14 '14 at 00:38
  • Thanks for the discussion in chat! I've posted the question to *spark-user* now. A definitive answer should be just a matter of time! http://apache-spark-user-list.1001560.n3.nabble.com/Is-shuffle-quot-stable-quot-td7628.html – Daniel Darabos Jun 14 '14 at 19:17
  • I've added an answer with an example where the order in `groupByKey` is unstable. It is only unstable when you have more than 1 worker, which is I think why we were misled. – Daniel Darabos Jun 16 '14 at 15:04