I'm trying to construct class which receives a parser as an argument and uses this parser on each line. Below is a minimal example that you can paste into spark-shell
.
import scala.util.{Success,Failure,Try}
import scala.reflect.ClassTag
class Reader[T : ClassTag](makeParser: () => (String => Try[T])) {
def read(): Seq[T] = {
val rdd = sc.parallelize(Seq("1","2","oops","4")) mapPartitions { lines =>
// Since making a parser can be expensive, we want to make only one per partition.
val parser: String => Try[T] = makeParser()
lines flatMap { line =>
parser(line) match {
case Success(record) => Some(record)
case Failure(_) => None
}
}
}
rdd.collect()
}
}
class IntParser extends (String => Try[Int]) with Serializable {
// There could be an expensive setup operation here...
def apply(s: String): Try[Int] = Try { s.toInt }
}
However, when I try to run something like new Reader(() => new IntParser).read()
(which type-checks just fine) I get the dreaded org.apache.spark.SparkException: Task not serializable
error relating to closures.
Why is there an error and is there a way to re-engineer the above to avoid this (while keeping Reader
generic)?