0

In theory, I think I understand the way that aggregate works, but I can't get past a very simple example.

Notably, the example here seems to have the wrong result. When I run the following example on my machine i get.....

seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
ag = sc.parallelize([1, 2, 3, 4]).aggregate((1,0), seqOp, combOp)

Then, the result I get is

>>> ag
(12, 4)

But, the link I cited says that the result is (19, 4). This guy is using a different version of spark, (1.2.0). I'm using 1.5.2. Did the aggregate function change between the versions of Spark?

If the answer is NO, then it is still baffling how 12 is the first element in that tuple. Examining just the first element of the tuple, we can see that
y is added to the first element of the tuple for every element in the RDD.

So, starting with (1,0), and since y is 1, 2, 3, 4, respectively, this should result in a series of tuples like: (2,1), (3,1), (4,1), (5,1). Now, when I add the first elements in the series of tuples, I get 14? Is there something obvious I'm missing for how to get 12? Thanks much.

Community
  • 1
  • 1
makansij
  • 9,303
  • 37
  • 105
  • 183
  • 1
    Possible duplicate of [Explain the aggregate functionality in Spark using python](http://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark-using-python) – zero323 Nov 17 '15 at 00:54
  • As for the proposition of this being a duplicate, I'm still confused as to how I can run literally the same code and get a different result on my machine? – makansij Nov 17 '15 at 04:44
  • 1
    Pretty much the same as passing not commutative / associative function to reduce can give you different results every time. These are not a valid operations from an algebraic point of view. And this is pretty much described by Johns answer there. I wouldn't even answer this one but I've missed this. I've made a small edit to this answer but there is really nothing more here. – zero323 Nov 17 '15 at 08:59

1 Answers1

1

No, behavior of the aggregate function haven't changed.

Problem with example you link is that zero element is not neutral. Since in practice zero value is created once per partition you can actually increment the first element of the tuple just by increasing number of partitions and passing no data at all:

sc.parallelize([], 10).aggregate((1,0), seqOp, combOp)
## (11, 0)

sc.parallelize([], 100).aggregate((1,0), seqOp, combOp)
## (101, 0)

asc.parallelize([], 1000).aggregate((1,0), seqOp, combOp)
## (1001, 0)

Take away message here is that zero value should be neutral given operation you perform.

Edit:

What do I mean by a neutral element? It should an identity element with respect of seqOp / combOp in an algebraic sense. I case of operations defined here a good choice would be (0, 0).

From a developers perspective you can think that number of times zeroElement is added to your data is not part of contract.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • What does it mean for a value to be "neutral"? – makansij Nov 17 '15 at 04:40
  • Still even after your explanation, I can't figure out why some of my examples won't work: `dotproductRDD = sc.parallelize([(4,1),(3,2)])`, aggregate function: `dotproductRDD.aggregate(1, lambda acc, aTuple: aTuple[0]*aTuple[1], lambda a,b: a+b) `, and this results in 7. I don't even use the zerovalue? – makansij Nov 17 '15 at 18:07
  • I am not even sure why you are trying to achieve.Do you mean something like this `dotproductRDD..map(lambda aTuple: aTuple[0] * aTuple[1]).sum()`? – zero323 Nov 17 '15 at 18:38
  • Well, what I'm trying to achieve is to understand this `aggregate` action. I know there are other ways to compute the dot product of two vectors. the type signature seems correct to me. my zerovalue is of type integer. my seqOp takes in an integer, U and a tuple, T, and returns an integer: `(U,T) => U`, and my `combOp` takes in two integers, adds them, and returns an integer: (U,U) => U`. – makansij Nov 17 '15 at 18:45
  • Your functions completely discard accumulator. In practice only one value in the partition will be retained. – zero323 Nov 17 '15 at 18:48
  • Think about it this way. `zeroValue` and `seqOp` are used like `partition.foldLeft(zeroValue)(seqOp)`. `combOp` is used for reduce like this: `Seq(p1, ..., pn).reduce(combOp)` where `p1` .. `pn` is output of previous fold. – zero323 Nov 17 '15 at 18:56
  • Is there a typo in your code snippet? (seqOp) is hanging out by itself. Still confused. my (U,T) => U only yields one value, and that is the value that I want to be retained so I'm not sure what you mean by "only one value ...will be retained". Also, I cannot find a `foldLeft` equivalent in python. – makansij Nov 18 '15 at 07:04