0

I have 2 seqOp functions given to aggregate which I expect to return identical results. They don't.

This version works:

rdd.aggregate(0)((acc, article) => (acc + (if (article.mentionsLanguage(lang)) 1 else 0)), _ + _)

This version doesn't work:

def seqOp(acc: Int, article: WikipediaArticle): Int = {
  acc + (if (article.mentionsLanguage(lang)) 1 else 0)
}

rdd.aggregate(0)(seqOp, _ + _)

For some reason the latter version is stuck doing nothing, consumes no CPU, and gives no errors. For the life of me I can not see how these functions are different. Am I actually misunderstanding something about lambda syntax?

Atte Juvonen
  • 4,922
  • 7
  • 46
  • 89
  • 1
    There is a more or less unrelated question with title "[Spark RDD aggregate action behaves strangely](https://stackoverflow.com/questions/38100918/spark-rdd-aggregate-action-behaves-strangely?rq=1)". Those just aren't very informative titles. I'd suggest to update the title to something less vague. – Andrey Tyukin Apr 23 '19 at 23:20
  • Possibly similar: [Avoid 'Task not Serializable' with nested method in a class](https://stackoverflow.com/questions/23732999/avoid-task-not-serialisable-with-nested-method-in-a-class). [Dataset reduce doesn't support shorthand function](https://stackoverflow.com/questions/51296655/dataset-reduce-doesnt-support-shorthand-function) – Andrey Tyukin Apr 23 '19 at 23:44

2 Answers2

2

I'd guess that seqOp is not a nested function, but actually a method tied to some humongous object. It could be that you are actually trying to send (acc, article) => this.seqOp(acc, article) to the worker nodes, where this is some heavy object tied to an even heavier object graph that resides on your master JVM. This forces your master node to attempt to serialize all the things tied to the object on which the method is defined, and from the outside it looks as if the computation doesn't even start properly, because the master never manages to send the whole object graph to the worker nodes.

When you are using the anonymous function syntax, it desugars into something like this:

rdd.aggregate(0)(
  new Function2[Int, WikipediaArticle, Int] {
    def apply(acc: Int, article: WikipediaArticle) = 
      (acc + (if (article.mentionsLanguage(lang)) 1 else 0))
  }, 
  _ + _
)

Here you can immediately see that the instance of the anonymous local class extending from Function2 does not have any references to any other objects. Indeed, it doesn't even have any member variables, so there is actually nothing to serialize (all you need to know is the class of this thing; it doesn't carry any extra information with it).

But when you define a method seqOp on some VeryLargeObject

class VeryLargeObject {
  val referencesToMillionOtherObjects: Array[Any]
  def seqOp(acc: Int, article: WikipediaArticle) = ...
}

and then later on attempt to use seqOp in your aggregate method, spark has to serialize the instance of VeryLargeObject, and with it all its transitive dependencies, and then send it over the network to the worker nodes. This process probably doesn't terminate within reasonable time, and so the whole application appears to be frozen.

Andrey Tyukin
  • 43,673
  • 4
  • 57
  • 93
1

RDD method aggregate expects a binary operator function as its seqOp parameter:

def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

What you've defined below is a method (not a function):

def seqOp(acc: Int, article: WikipediaArticle): Int = {
  acc + (if (article.mentionsLanguage(lang)) 1 else 0)
}

Here's how you would define seqOp as a function:

val seqOp = (acc: Int, article: WikipediaArticle) => {
  acc + (if (article.mentionsLanguage(lang)) 1 else 0)
}
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • 1
    I'm somehow skeptical about the eta expansion part. How is that supposed to help? Once it's tied to a particular instance, it cannot function without that instance, no matter whether you wrap it explicitly or not. – Andrey Tyukin Apr 23 '19 at 23:06
  • Thanks for the answer. I've been reading about the difference between functions and methods, and a lot of sources are claiming that methods _are_ functions. Andrey's answer gives some direction on why Spark crawled to a halt when I tried passing a method-function instead of a non-method-function, but I still don't really understand it. – Atte Juvonen Apr 23 '19 at 23:18
  • 2
    @AtteJuvonen Functions are special objects with a single method. Methods can be easily converted into a function by wrapping (that is usually done automatically by the compiler). As long as you stay on a single JVM, this essentially doesn't matter, because everything stays in the same place in the memory. But if you are trying to send a function over the wire from your master node to all your worker nodes, it all of a sudden really matters whether your function is a tiny object with a single method, or whether it's a wrapper around a method of an object tied to a gigantic object graph. – Andrey Tyukin Apr 23 '19 at 23:27
  • 1
    @Andrey Tyukin, you're right that eta expansion wouldn't help here. And I think what you've described is very likely the actual cause. – Leo C Apr 24 '19 at 00:10