39

I just discovered the RDD.zip() method and I cannot imagine what its contract could possibly be.

I understand what it does, of course. However, it has always been my understanding that

  • the order of elements in an RDD is a meaningless concept
  • the number of partitions and their sizes is an implementation detail only available to the user for performance tuning

In other words, an RDD is a (multi)set, not a sequence (and, of course, in, e.g., Python one gets AttributeError: 'set' object has no attribute 'zip')

What is wrong with my understanding above?

What was the rationale behind this method?

Is it legal outside the trivial context like a.map(f).zip(a)?

EDIT 1:

  • Another crazy method is zipWithIndex(), as well as well as the various zipPartitions() variants.
  • Note that first() and take() are not crazy because they are just (non-random) samples of the RDD.
  • collect() is also okay - it just converts a set to a sequence which is perfectly legit.

EDIT 2: The reply says:

when you compute one RDD from another the order of elements in the new RDD may not correspond to that in the old one.

This appears to imply that even the trivial a.map(f).zip(a) is not guaranteed to be equivalent to a.map(x => (f(x),x)). What is the situation when zip() results are reproducible?

Community
  • 1
  • 1
sds
  • 58,617
  • 29
  • 161
  • 278
  • 1
    I have to dig deeper, but it does seem that this is not the safest method (http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip). So, I dont know that I would rely on it? Although, in that case it at least throws now it seems: https://issues.apache.org/jira/browse/SPARK-1817 – Justin Pihony Mar 25 '15 at 23:49
  • 2
    [These comments on `zipWithIndex()` by Matei](https://issues.apache.org/jira/browse/SPARK-3098?focusedCommentId=14117622&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14117622), the initial author of Spark, may be enlightening. – Nick Chammas Mar 26 '15 at 17:00

2 Answers2

25

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning and order, like map. That said I find it a little easy to accidentally violate the assumptions that zip depends on, since they're a little subtle, but it certainly has a purpose.

Sean Owen
  • 66,182
  • 23
  • 141
  • 173
  • 1
    Is it _guaranteed_ that `a.map(f).zip(a)` is equivalent to `a.map(x => (f(x),x))`? – sds Mar 26 '15 at 16:13
  • Depends on what a is really. If it has a defined ordering then yes should be. – Sean Owen Mar 26 '15 at 16:19
  • 2
    are you saying that RDD remembers that it went through `sortBy`? for how long? is that property preserved by `map`? `filter`? `flatMap`? – sds Mar 26 '15 at 16:20
  • 1
    Also, could you please provide a canonical use case for `zip`? I.e., a situation when the alternative(s), if any, are clearly worse (+1, thanks for the excellent answer) – sds Mar 26 '15 at 16:21
  • RDDs are immutable. Sorting produces a new RDD that is always sorted. Yes it is often used for putting back together some 1:1 transform of the original. I try to avoid it if possible since there is usually another way to do without it and doesn't require ordering for correctness – Sean Owen Mar 26 '15 at 16:25
  • 1
    I think one needs to be even more careful here. One might think that using sortBy is enough - but this only seems to be the case if you sortBy a key with no duplicates (e.g., some primary key/id). If there are duplicate keys, again the order is under-specified within those duplicate groups and the zip may not yield expected results. This comes up when one has a RDD[ComplexObject] where it can be difficult to pick a natural key to sort on that guarantees uniqueness. This is especially problematic when you try to extract generic functions out - as you don't know whether these properties hold. – MGwynne Jul 21 '16 at 11:27
  • Not saying that zip doesn't have some uses, but it seems to be very very specific, and the only way to guarantee correctness seems to be to checkpoint the file (ideally to HDFS/etc for redundancy) before doing order-dependent operations such as zip, zipWithIndex, randomSplit etc. Would be good to know if there is a better way. The documentation for zip seems too simple and ends up being misleading unless one already understands the issue (it seems to me). – MGwynne Jul 21 '16 at 11:30
8

The mental model I use (and recommend) is that the elements of an RDD are ordered, but when you compute one RDD from another the order of elements in the new RDD may not correspond to that in the old one.

For those who want to be aware of partitions, I'd say that:

  1. The partitions of an RDD have an order.
  2. The elements within a partition have an order.
  3. If you think of "concatenating" the partitions (say laying them "end to end" in order) using the order of elements within them, the overall ordering you end up with corresponds to the order of elements if you ignore partitions.

But again, if you compute one RDD from another, all bets about the order relationships of the two RDDs are off.

Several members of the RDD class (I'm referring to the Scala API) strongly suggest an order concept (as does their documentation):

collect()
first()
partitions
take()
zipWithIndex()

as does Partition.index as well as SparkContext.parallelize() and SparkContext.makeRDD() (which both take a Seq[T]).

In my experience these ways of "observing" order give results that are consistent with each other, and the ones that translate back and forth between RDDs and ordered Scala collections behave as you would expect -- they preserve the overall order of elements. This is why I say that, in practice, RDDs have a meaningful order concept.

Furthermore, while there are obviously many situations where computing an RDD from another must change the order, in my experience order tends to be preserved where it is possible/reasonable to do so. Operations that don't re-partition and don't fundamentally change the set of elements especially tend to preserve order.

But this brings me to your question about "contract", and indeed the documentation has a problem in this regard. I have not seen a single place where an operation's effect on element order is made clear. (The OrderedRDDFunctions class doesn't count, because it refers to an ordering based on the data, which may differ from the raw order of elements within the RDD. Likewise the RangePartitioner class.) I can see how this might lead you to conclude that there is no concept of element order, but the examples I've given above make that model unsatisfying to me.

Spiro Michaylov
  • 3,531
  • 21
  • 19
  • Your first sentence _elements of an RDD are ordered, but that the order may change_ means that there is **no _meaningful_ order**, whatever order is actually observed is just an accidental implementation detail, not anything immanent to the RDD. – sds Mar 26 '15 at 13:49
  • @sds When, as in this case, there's no explicit contract **everything** can be accused of being an accidental implementation detail. However, what I meant to say is that when you compute on RDD from another there are no guarantees about the new one having the same order as the old one. Any individual RDD in practice has a stable order: the different ways of observing order give consistent results, both instantaneously and over a period of time. For me, that's meaningful enough, but I'd rather have a contract. – Spiro Michaylov Mar 26 '15 at 14:20
  • Edited the answer because @sds made me realize my reasoning wasn't clear. – Spiro Michaylov Mar 26 '15 at 14:24
  • I added EDIT2 based on your edit. Thanks for your excellent reply; sorry about this "comment/edit discussion". – sds Mar 26 '15 at 14:31
  • 1
    No problem: it's an interesting discussion and one that may be worth taking into the Spark community. I see that questions sent to the Spark user list have a low answer rate, but I was thinking of composing a question that might interest the core developers and posting it on the developers list. This [initially promising thread](http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-order-guarantees-tc10142.html) isn't really relevant. Feel free to beat me to it if so inclined. – Spiro Michaylov Mar 26 '15 at 16:18
  • "For me, that's meaningful enough, but I'd rather have a contract." I don't believe you will get such a contract, because RDDs are akin to SQL tables--i.e. unordered collections. The only thing that guarantees a certain order is an explicit call to one of the ordering transformations. If you have observed any stable ordering otherwise, that is purely an implementation artifact that may change at any time. Rely on it at your own peril. :) – Nick Chammas Mar 26 '15 at 17:09
  • [These comments by the primary author of Spark](https://issues.apache.org/jira/browse/SPARK-3098?focusedCommentId=14117622&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14117622) might explain more. – Nick Chammas Mar 26 '15 at 17:09
  • Yes, that's a useful link. Thanks! – Spiro Michaylov Mar 26 '15 at 17:56