4

I saw this example in this book “Learning Spark: Lightning-Fast Big Data Analysis”:

class SearchFunctions(val query: String) {
 // more methods here
 def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
 // Safe: extract just the field we need into a local variable
 val query_ = this.query
 rdd.map(x => x.split(query_))
 }
}

My question is - the comment says : Safe: extract just the field we need into a local variable

Why extracting to local variable is safer than using the field (defined as a val) itself?

Community
  • 1
  • 1
igx
  • 4,101
  • 11
  • 43
  • 88

3 Answers3

4

Passing Functions in Spark is really helpful and has the answer to your question.

The idea is that you want only the query to be communicated to the workers that need it, and not the whole object (of the class).

If you didn't do it that way (if you were using the field in your map(), instead of the local variable), then:

...sending the object that contains that class along with the method. In a similar way, accessing fields of the outer object will reference the whole object


Note, this is also safer, not just more efficient, because it minimizes the memory usage.

You see, when handling really big data, your job will be facing its memory limitations, and if it exceeds them, it will be killed by the resource manager (for example YARN), so we want to make sure we use as less memory as possible, to make sure our job will make it and not fail!

Moreover, a big object will result in larger communication overhead. The TCP connection may be reset by peer, when the communication size is too big, which will invoke unnecessary overhead, which we want to avoid, because bad communication is also a reason for a job to fail.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • There is a typo in "functions" but I cannot correct it. –  Aug 31 '16 at 18:43
  • Oh @LostInOverflow you have posted an answer as well! I didn't see it..Seems we agree, aren't we? Good to be confirmed, here is my upvote. I know and I have fixed it! Thanks! :) – gsamaras Aug 31 '16 at 18:44
  • 1
    We definitely do :) –  Aug 31 '16 at 18:47
  • 1
    so IIUC doing rdd.map(x => x.split(query)) will create a reference to the whole class ??? anyway why it is safer how it can be unsafe ? – igx Sep 01 '16 at 04:26
2

Because when you extract only query_ has to be serialized and send to the workers.

If you didn't extract, a complete instance of SearchFunctions would be sent.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • 1
    so , I understand that it is more efficient since we serialize just the necessary part, but why it is "safer" ? – igx Sep 01 '16 at 04:27
  • Safer because there is no unwanted state that can be carried around, including objects that are not serializable. –  Sep 01 '16 at 12:45
2

Since the other answers don't mention it: another reason it's safer is because the class whose fields/methods you are referencing may not be serializable. Since Java doesn't allow checking this at compile-time, you'll get a runtime failure. There are very many examples of such problems on StackOverflow: the first few I found are Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects, SparkContext not serializable inside a companion object, Enriching SparkContext without incurring in serialization issues, Spark serialization error. Searching for spark NotSerializableException should give you many more, not just on Stack Overflow of course.

Or it may be serializable now, but apparently unrelated changes (such as adding a field your lambda doesn't use) can break your code by making it non-serializable or significantly decreasing your performance.

Community
  • 1
  • 1
Alexey Romanov
  • 167,066
  • 35
  • 309
  • 487