Below is my spark program written in scala to find the anagrams of the given words. But the program is failed when executed from a test case.
class Anagram {
def collectAnagrams(name: String,rdd : RDD[String]): RDD[String] = {
return rdd.flatMap(line => line.split("\\s+")).filter(x=>verifyAnagrams(x,name));
}
def verifyAnagrams(str1 : String, str2 : String): Boolean = {
if(str1.length != str2.length) {
return false;
}
val letters = Array.fill[Int](256)(0);
for(i <- 0 until str1.length) {
letters(str1.charAt(i).toInt)+=1;
letters(str2.charAt(i).toInt)-=1;
}
for(i <-0 until 256) {
if(letters(i) != 0) {
return false;
}
}
return true;
}
}
class AnagramTest extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
before {
val conf = new SparkConf().setMaster("local").setAppName("anagarm of string")
sc = new SparkContext(conf)
}
test("Anagram string check in a file") {
val anagramToken : String = "Tunring"
// @@ SETUP
val Anagram = new Anagram()
// @@ EXERCISE
val anagrams = Anagram.collectAnagrams(anagramToken,sc.textFile(getClass.getClassLoader.getResource("word_count_input.txt").getPath))
// @@ VERIFY
assert(anagrams.collect().toSet.size == 1)
}
}
When the above test case is executed the following exception occurs
Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.filter(RDD.scala:303) at Anagram.collectAnagrams(Anagram.scala:10)** assert(anagrams.collect().toSet == Set("Tunring","Tunring"))
I would like to like to know exact root cause as well as the following.
- Does the every class called from the spark context needs to serialized?.
- Does every methods defined needs to serialized as bytes and send across the nodes?.
- Does the enclosing class of the RDD's called need to be serialized?.
- As per my understanding the transformations functions are send across the nodes. so these methods are to be serialized. What about the new methods written?.
Any help appreciated.