1

I'm trying to construct class which receives a parser as an argument and uses this parser on each line. Below is a minimal example that you can paste into spark-shell.

import scala.util.{Success,Failure,Try}
import scala.reflect.ClassTag

class Reader[T : ClassTag](makeParser: () => (String => Try[T])) {

  def read(): Seq[T] = {

    val rdd = sc.parallelize(Seq("1","2","oops","4")) mapPartitions { lines =>

      // Since making a parser can be expensive, we want to make only one per partition.
      val parser: String => Try[T] = makeParser()

      lines flatMap { line =>
        parser(line) match {
          case Success(record) => Some(record)
          case Failure(_) => None
        }
      }
    }

    rdd.collect()
  }
}

class IntParser extends (String => Try[Int]) with Serializable {
  // There could be an expensive setup operation here...
  def apply(s: String): Try[Int] = Try { s.toInt }
}

However, when I try to run something like new Reader(() => new IntParser).read() (which type-checks just fine) I get the dreaded org.apache.spark.SparkException: Task not serializable error relating to closures.

Why is there an error and is there a way to re-engineer the above to avoid this (while keeping Reader generic)?

Alec
  • 31,829
  • 7
  • 67
  • 114
  • Strange. The function is only closing over `makeParser`, but `() => new IntParser` should be serializable. What happens if you replace passing `makeParser` with `parser` as argument? – Alexey Romanov Jun 08 '16 at 06:33
  • @AlexeyRomanov If I just make `parser` the argument to `Reader[T]` I still get the same message (slightly different trace, but still closure related) – Alec Jun 08 '16 at 06:38
  • @Alec - Quick fix move the line val parser: String => Try[T] = makeParser() before the val rdd = .. – Knight71 Jun 08 '16 at 06:47
  • @Knight71 doesn't that mean that I no longer have one parser per partition though? Thanks though, that gives me something to ponder. Maybe with lazy values... – Alec Jun 08 '16 at 06:53
  • you will have one parser per rdd and this parser will be distributed across executors. – Knight71 Jun 08 '16 at 06:55
  • @Knight71 I'm a bit new to Spark. Do executors and nodes have a one-to-one relationship (per rdd, of course)? – Alec Jun 08 '16 at 07:12
  • need not necessarily the executors and nodes (workers). In general executors are your app that will be running inside your worker. If you have N app and M workers . Then your apps will be distributed equally among the workers . – Knight71 Jun 08 '16 at 07:21

1 Answers1

2

The problem is that makeParser is variable to class Reader and since you are using it inside rdd transformations spark will try to serialize the entire class Reader which is not serializable. So you will get task not serializable exception.

Adding Serializable to the class Reader will work with your code. But that is not a good practice since it will serialize entire class variables which might not be needed.

In general you could use the functions instead of method to avoid serialization issues. Because in scala functions are actually objects and it will be serialized.

Refer to this answer : Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Community
  • 1
  • 1
Knight71
  • 2,927
  • 5
  • 37
  • 63
  • 1
    Alternate fixes: 1. make `makeParser` an argument of `read`, not of `Reader`. 2. Change `read` to store the `makeParser` function in a local variable: `val makeParser0 = makeParser; ... val parser: String => Try[T] = makeParser0() ...`. – Alexey Romanov Jun 08 '16 at 07:32
  • Ahhh! That first paragraph explains it - everything makes sense now. @AlexeyRomanov I will likely end up doing just that! – Alec Jun 08 '16 at 07:39