0

Below is my spark program written in scala to find the anagrams of the given words. But the program is failed when executed from a test case.

class Anagram {

  def collectAnagrams(name: String,rdd : RDD[String]): RDD[String] = {

    return rdd.flatMap(line => line.split("\\s+")).filter(x=>verifyAnagrams(x,name));
  }
  def verifyAnagrams(str1 : String, str2 : String): Boolean = {
    if(str1.length != str2.length) {
      return false;
    }
    val letters = Array.fill[Int](256)(0);
    for(i <- 0 until str1.length) {

      letters(str1.charAt(i).toInt)+=1;
      letters(str2.charAt(i).toInt)-=1;
    }

    for(i <-0 until 256) {
      if(letters(i) != 0) {
        return false;
      }
    }
    return true;
  }

}

class AnagramTest extends FunSuite with BeforeAndAfter {

  var sc: SparkContext = _
  before {
    val conf = new SparkConf().setMaster("local").setAppName("anagarm of string")
    sc = new SparkContext(conf)
  }

  test("Anagram string check in a file") {
    val anagramToken : String = "Tunring"
    // @@ SETUP
    val Anagram = new Anagram()

    // @@ EXERCISE
    val anagrams =  Anagram.collectAnagrams(anagramToken,sc.textFile(getClass.getClassLoader.getResource("word_count_input.txt").getPath))

    // @@ VERIFY
    assert(anagrams.collect().toSet.size == 1)
  }
}

When the above test case is executed the following exception occurs

Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.filter(RDD.scala:303) at Anagram.collectAnagrams(Anagram.scala:10)** assert(anagrams.collect().toSet == Set("Tunring","Tunring"))

I would like to like to know exact root cause as well as the following.

  1. Does the every class called from the spark context needs to serialized?.
  2. Does every methods defined needs to serialized as bytes and send across the nodes?.
  3. Does the enclosing class of the RDD's called need to be serialized?.
  4. As per my understanding the transformations functions are send across the nodes. so these methods are to be serialized. What about the new methods written?.

Any help appreciated.

Vadim Martynov
  • 8,602
  • 5
  • 31
  • 43
nagendra
  • 1,885
  • 3
  • 17
  • 27
  • Possible duplicate of [Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects](http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou) – sgvd Feb 20 '16 at 10:21

2 Answers2

2

You can make the Anagram class serializable, or move the verifyAnagrams function to an object. Functions declared inside objects in scala are equivalent to java static methods, so there is no need to serialize them.

Read also my blog post about using non-serializable objects in Spark applications.

Nicola Ferraro
  • 4,051
  • 5
  • 28
  • 60
  • Thanks Nicola, also i would like to know does object in scala by default serialized and also is there any way to serialize only the functions defined in the class – nagendra Mar 02 '16 at 03:10
  • Objects do not need to be serialized, because when you use functions declared inside objects, they are treated as static java methods. In order to use them, the JVM class corresponding to the object must be available in the classpath of the worker machine, nothing more. – Nicola Ferraro Mar 02 '16 at 22:46
  • A top level function declared inside a `class` cannot be used without serializing the declaring object. If you want to define a function inside a class and use it in a task, without serializing the enclosing object, you can, for example, declare the function inside another function (it is possible in scala). – Nicola Ferraro Mar 02 '16 at 22:49
0

Move your function verifyAnagrams to Object rather than in class.

Pankaj Arora
  • 544
  • 2
  • 6