60

RDD has a meaningful (as opposed to some random order imposed by the storage model) order if it was processed by sortBy(), as explained in this reply.

Now, which operations preserve that order?

E.g., is it guaranteed that (after a.sortBy())

a.map(f).zip(a) === 
a.map(x => (f(x),x))

How about

a.filter(f).map(g) === 
a.map(x => (x,g(x))).filter(f(_._1)).map(_._2)

what about

a.filter(f).flatMap(g) === 
a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2)

Here "equality" === is understood as "functional equivalence", i.e., there is no way to distinguish the outcome using user-level operations (i.e., without reading logs &c).

Community
  • 1
  • 1
sds
  • 58,617
  • 29
  • 161
  • 278
  • I guess that any operation that changes the elements in an RDD cannot be expected to preserve order. eg. `intRdd.map(x=>x*-1)`. On rdds with a key, there're dedicated operations that preserve the order `pairRDD.mapValues` and pairRDD.flatMapValues` - not sure if there's a generalization that could satisfy this question- hence the comment. – maasg Mar 26 '15 at 20:43
  • RDDs are immutable; all operation create new RDDs. – sds Mar 26 '15 at 20:44
  • look at the last line of the question, I am talking about functional equivalence rather than physical identity – sds Mar 26 '15 at 20:58
  • @maasg: That's different from how I think this works. I've added an answer, but please let me know if you disagree. Especially if you can provide a counter-example in `spark-shell`. Thanks! – Daniel Darabos Mar 27 '15 at 12:58
  • @DanielDarabos I misinterpreted the question and my comment was me thinking in terms of "collection being sorted" rather than preservation of the element ordering. – maasg Mar 27 '15 at 13:23

2 Answers2

69

All operations preserve the order, except those that explicitly do not. Ordering is always "meaningful", not just after a sortBy. For example, if you read a file (sc.textFile) the lines of the RDD will be in the order that they were in the file.

Without trying to give a complete list, map, filter and flatMap do preserve the order. sortBy, partitionBy, join do not preserve the order.

The reason is that most RDD operations work on Iterators inside the partitions. So map or filter just has no way to mess up the order. You can take a look at the code to see for yourself.

You may now ask: What if I have an RDD with a HashPartitioner. What happens when I use map to change the keys? Well, they will stay in place, and now the RDD is not partitioned by the key. You can use partitionBy to restore the partitioning with a shuffle.

vy32
  • 28,461
  • 37
  • 122
  • 246
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • 1
    Daniel, I was expecting something like that as well, where only a shuffle step would break the ordering, but it seems that RDD ordering is coincidental and not contractual. This was a good thread: https://issues.apache.org/jira/browse/SPARK-3098 What I don't understand is this question after getting that info on a previous question: http://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method/29281548#29268210 – maasg Mar 27 '15 at 13:20
  • I haven't read SPARK-3098 fully, but it uses `distinct`. `distinct` has to build a hashmap of the lines, so it loses the ordering. In the other question I think Sean is saying the same thing, that RDDs have an ordering. They are not multisets. – Daniel Darabos Mar 27 '15 at 13:30
  • 8
    I can confirm that repartition does *not* preserve order, as far as I can tell. If I run `x = sc.textFile('somefile'); y = x.repartition(100); a = x.collect(); b = y.collect()`, then `a==b` is returns `False`. – moustachio Sep 29 '15 at 17:20
  • 2
    @moustachio: Oops, thanks! You're right. `repartition` calls `coalesce` with `shuffle=true`, so it's obvious it will _shuffle_ the RDD. I've fixed the list. – Daniel Darabos Sep 29 '15 at 20:57
  • The question has a lot more votes than my answer. I wonder if I am missing something that people are looking for. Let me know if my answer did not satisfy you! Thanks. – Daniel Darabos Nov 29 '15 at 17:42
  • I think creating a dataframe from an RDD using `toDF` does not preserve order, just my 2 cents – architectonic Apr 20 '16 at 14:19
  • @architectonic: What makes you think that? [The code](https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L31) just does a `map` on an iterator, so it does not change the order. Also a simple `sc.parallelize(1 to 100, 10).toDF.show(100)` test shows the RDD in the original order. – Daniel Darabos Apr 21 '16 at 12:11
  • i am not quite sure if it 'preserves' the order with coalesce when you merge multiple partitions, which partitions goes into the final partition first? that would matter for the final order. – soMuchToLearnAndShare Sep 13 '16 at 01:36
  • 1
    @MinnieShi: If partitions 2 and 3 get coalesced into one partition then it will just chain the iterators from partitions 2 and 3, so the new partition will contain the elements of partition 2 in order followed by the elements of partition 3 in order. Is this unclear in the answer? Or do you know it to be wrong? – Daniel Darabos Sep 13 '16 at 08:59
  • @DanielDarabos I'm missing a general comment avoid the side effects of the transformations in the RDD. I mean, what is the amortized complexity in a particular executor? Is it more like a hash-table, or a tree? It's not a pure sequence/vector as that would need another data structure and time to keep the order. After many transformations, I'm confused about how you can keep both the order and have a feasible implementation that allows the fast operations required in the executors. – hectorpal Sep 20 '16 at 19:25
  • @hectorpal: No idea what you are speaking about. Take `map` for example. Items come in order, and get transformed one by one. It is very easy to keep the order, no data structure is needed. We would need a data structure to _change_ the order: we would need to put some input items in the data structure and output them later. (This is what `distinct` does, for example.) But if you need more details you can take a look at the source code (this part is very readable) or ask a separate question. – Daniel Darabos Sep 21 '16 at 12:49
  • Ok, let's focus on map and add a filter step. If items are partitioned, they will be processed in order inside each node Mi. Suppose the results, mapped plus filtered, are to be moved to one single node C for a final processing. Nodes Mi have different speeds so the result of map won't arrive in order to C. What data structure would be used in C for preserving the order? Let's assume the items have some sort of index. C is receiving items with an index, but C doesn't know how many will arrive. It could be more efficient to store without ordering. How is the order preserved? yes, source code... – hectorpal Sep 21 '16 at 16:00
  • There is no push in Spark, it's all pull. Machine C will fetch (pull) the blocks in order. – Daniel Darabos Sep 21 '16 at 17:16
  • Bingo. Thank you. It's interesting this could affect memory use in Mi nodes. A faster node could have memory problems if it's producing results too fast. – hectorpal Sep 21 '16 at 18:00
  • I'm not exactly sure how it works out (or even how the code we are talking about looks), but almost certainly it has no memory impact. Either the results are only generated as they are requested, or they are written to disk as they are produced and then served from disk when the fetch comes. – Daniel Darabos Sep 21 '16 at 22:52
  • Hi @Daniel Darabos, in response to your comment (Sep 13 at 8:59). Thank you for the clarification. I did not know that partition 1 will go in before partition 2 etc. And I've tested since, correct, the final order is still correct after using the coalesce. Thank you. – soMuchToLearnAndShare Sep 29 '16 at 01:26
  • Does `select` preserve the order of rows? – Cosmozhang Apr 12 '23 at 02:21
5

In Spark 2.0.0+ coalesce doesn't guarantee partitions order during merge. DefaultPartitionCoalescer has optimization algorithm which is based on partition locality. When a partition contains information about its locality DefaultPartitionCoalescer tries to merge partitions on the same host. And only when there is no locality information it simply splits partition based on their index and preserves partitions order.

UPDATE:

If you load DataFrame from files, like parquet, Spark breaks order when it plans file splits. You can see it in DataSourceScanExec.scala#L629 or in new Spark 3.x FileScan#L152 if you use it. It just sorts partitions by size and the splits which are less than spark.sql.files.maxPartitionBytes gets to last partitions.

So, if you need to load sorted dataset from files you need to implement your own reader.

Avseiytsev Dmitriy
  • 1,160
  • 9
  • 19
  • 1
    Anecdotally I can confirm this is correct. When I switched from Spark 2 to Spark 3 I started noticing that some of my data was occasionally losing its sortedness. The job building that data was doing `df.sort(...).coalesce(...)`, and switching that job to use `df.coalesce(...).sort(...)` seems to have fixed the problem. (Though TBF I could never seem to reproduce the issue in my testing - I just haven't found any unsorted data after making this change.) – 0x5453 Jan 14 '22 at 19:36