2

I got not serializable error when running this code:

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
  def findHighestRatingUsers(movieRating: String): (String) = {
    val tokens = movieRating.split(",", -1)
    val movieTitle = tokens(0)
    val ratings = tokens.slice(1, tokens.size)
    val maxRating = ratings.max
    var userIds = ArrayBuffer[Int]()

    for(i <- 0 until ratings.length){
      if (ratings(i) == maxRating) {
        userIds += (i+1)
      }
    }

    return movieTitle + "," + userIds.mkString(",")

    return movieTitle
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val Lines = sc.textFile(args(0))


    val TitleAndMaxUserIds = Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}

The error occurs at line:

val TitleAndMaxUserIds =Lines.map(findHighestRatingUsers)
      .saveAsTextFile(args(1))

I believe this is due to something in function 'findHighestRatingUsers'. Could somebody explain why and how to fix it?

More info in the exception is like:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.map(RDD.scala:395)
    at Task1$.main(Task1.scala:63)
    at Task1.main(Task1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Task1$
Serialization stack:
    - object not serializable (class: Task1$, value: Task1$@3c770db4)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Task1$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Task1$.$anonfun$main$1:(LTask1$;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class Task1$$$Lambda$1023/20408451, Task1$$$Lambda$1023/20408451@4f59a516)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 22 more

I checked this post Difference between object and class in Scala and tried to use object to enclose the function:

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer

object Function{
    def _findHighestRatingUsers(movieRating: String): (String) = {
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()

      for(i <- 0 until ratings.length){
        if (ratings(i) == maxRating) {
          userIds += (i+1)
        }
      }

      return movieTitle + "," + userIds.mkString(",")
    }

}

object Task1 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val textFile = sc.textFile(args(0))

    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}

But still got exception With a huge amount of errors...


This time I tried to put object Function in the object task1 like this:

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object Task1 {
    
  object Function{
    def _findHighestRatingUsers(movieRating: String): (String) = {
      val tokens = movieRating.split(",", -1)
      val movieTitle = tokens(0)
      val ratings = tokens.slice(1, tokens.size)
      val maxRating = ratings.max
      var userIds = ArrayBuffer[Int]()

      for(i <- 0 until ratings.length){
        if (ratings(i) == maxRating) {
          userIds += (i+1)
        }
      }

      return movieTitle + "," + userIds.mkString(",")
    }
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Task 1")
    val sc = new SparkContext(conf)

    val textFile = sc.textFile(args(0))

    val output = textFile.map(Function._findHighestRatingUsers)
      .saveAsTextFile(args(1))
  }
}

And problem solved. But I still don't know why the nested object solves this problem. Could somebody explain it? And further more, I have several points not sure:

  1. What is main function in scala? Is it the entrance of program?
  2. Why we use an object to describe main function?
  3. Could somebody give a common structure of a Scala program containing function, class or some basic components?
Leming Qiu
  • 21
  • 3

1 Answers1

1

First thing is that I would recommend that you should get familiar by reading documentation both with Scala and Spark as your questions highlight that you are just starting working with it.

I'll give you some insights for your original question about "Task not serializable" (but not answering it precisely though) and let you open other questions for the questions you added later in your post otherwise this answer will be a mess.

As you probably know, Spark allows distributed computation. To do so, one thing Spark does is take the code you write, serialize it and send it to some executors somewhere to actually run it. The key part here is that your code must be serializable.

The error you got is telling you that Spark cannot serialize your code.

Now, how to make it serializable? This is where it can becomes challenging and even though Spark tries to help you by providing a "serialization stack", sometimes the info it provides are not that helpful.

In your case (1st example of code), findHighestRatingUsers must be serialized but to do so it has to serialize the whole object Task1 which is not serializable.

Why is Task1 not serializable? I'll admit I'm not really sure but I would bet on the main method, though I'd expected your 2nd example to be serializable then.

You can read more about this on various documentation or blog posts on the web. For instance: https://medium.com/swlh/spark-serialization-errors-e0eebcf0f6e6

Oli
  • 9,766
  • 5
  • 25
  • 46
Gaël J
  • 11,274
  • 4
  • 17
  • 32