I'd guess that seqOp
is not a nested function, but actually a method tied to some humongous object. It could be that you are actually trying to send (acc, article) => this.seqOp(acc, article)
to the worker nodes, where this
is some heavy object tied to an even heavier object graph that resides on your master JVM. This forces your master node to attempt to serialize all the things tied to the object on which the method is defined, and from the outside it looks as if the computation doesn't even start properly, because the master never manages to send the whole object graph to the worker nodes.
When you are using the anonymous function syntax, it desugars into something like this:
rdd.aggregate(0)(
new Function2[Int, WikipediaArticle, Int] {
def apply(acc: Int, article: WikipediaArticle) =
(acc + (if (article.mentionsLanguage(lang)) 1 else 0))
},
_ + _
)
Here you can immediately see that the instance of the anonymous local class extending from Function2
does not have any references to any other objects. Indeed, it doesn't even have any member variables, so there is actually nothing to serialize (all you need to know is the class of this thing; it doesn't carry any extra information with it).
But when you define a method seqOp
on some VeryLargeObject
class VeryLargeObject {
val referencesToMillionOtherObjects: Array[Any]
def seqOp(acc: Int, article: WikipediaArticle) = ...
}
and then later on attempt to use seqOp
in your aggregate
method, spark has to serialize the instance of VeryLargeObject
, and with it all its transitive dependencies, and then send it over the network to the worker nodes. This process probably doesn't terminate within reasonable time, and so the whole application appears to be frozen.