0

I try to call a method on a value. But get an error. My method.

processDate(p(2))

The values look somewhat like 20160125204123

This is my class.

class ScalaJob(sc: SparkContext) {
  def run(filePath: String) : RDD[(String, String, String)] = {
    //pass the file
    val file = sc.textFile(filePath);
    //find values in every raw
    val values = file.map{
      dataRaw =>
      val p = dataRaw.split("[|]",-1)
      (p(1), processDate(p(2)), p(32))
    }

My method should return a string

def processDate(s: String) : String = {

Is there a way to make it work?

gjin
  • 860
  • 1
  • 14
  • 28

2 Answers2

1

Any code used inside RDD.map in this case file.map will be serialized and shipped to executors. So for this to happen, the code should be serializable. In this case you have used the method processDate which is defined elsewhere. Make sure the class in which the method is defined is serializable. Note: you need to make the entire dependency chain serializable. One quick option is bind processDate to a val as a function and use it inside RDD. Or define the method in an object. Example:

class ScalaJob(sc: SparkContext) {
  def run(filePath: String): RDD[(String, String, String)] = {
    //pass the file
    val file = sc.textFile(filePath);
   //find values in every raw
    val process = processDate _
    val values = file.map {
      dataRaw =>
        val p = dataRaw.split("[|]", -1)
        (p(1), process(p(2)), p(32))
    }
  }
}

See Spark Task Not Serializable for more details

Community
  • 1
  • 1
Aravindh S
  • 1,185
  • 11
  • 19
  • I gave a sample. I have followed this approach in my development and it will work. The concept is simple. When you use a method inside `map`, it gets closed over and serialized. In order for the serialization to succeed, the complete dependency chain should be serialized. Instead when you bind the method to a function outside of `map`, a method local immutable copy is created which will not have serialization issues. – Aravindh S Oct 21 '16 at 00:05
1

The error is coming because the sc: SparkContext which is Non serializable by nature. Spark will try to send entire ScalaJob for closure which includes sc. Thats where the problem comes.The fix is make the sc @transient

Your class defination should be like this

class ScalaJob(@transient sc: SparkContext ) extends Serializable { }
Balaji Reddy
  • 5,576
  • 3
  • 36
  • 47