7

Spark throws Task not serializable when I use case class or class/object that extends Serializable inside a closure.

object WriteToHbase extends Serializable {
    def main(args: Array[String]) {
        val csvRows: RDD[Array[String] = ...
        val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        val usersRDD = csvRows.map(row => {
            new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
        })
        processUsers(sc: SparkContext, usersRDD, dateFormatter)
    })
}

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {

    usersRDD.foreachPartition(part => {

        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, tablename)

        part.foreach(userRow => {
            val id = userRow.id
            val date1 = dateFormatter.parseDateTime(userRow.date1)
        })
        table.flushCommits()
        table.close()
    })
}

My first attempt was to use a case class:

case class UserTable(id: String, name: String, address: String, ...) extends Serializable

My second attempt was to use a class instead of a case class:

class UserTable (val id: String, val name: String, val addtess: String, ...) extends Serializable {
}

My third attempt was to use a companion object in the class:

object UserTable extends Serializable {
    def apply(id: String, name: String, address: String, ...) = new UserTable(id, name, address, ...)
}
sophie
  • 991
  • 2
  • 15
  • 34

2 Answers2

5

Most likely the function "doSomething" is defined on your class which isn't serilizable. Instead move the "doSomething" function to a companion object (e.g. make it static).

Holden
  • 7,392
  • 1
  • 27
  • 33
  • Hi Holden, "doSomething" method is inside the same class which extends the Serializable. – sophie May 28 '15 at 05:44
  • @sophie can you try putting it in a campanion object anyways? Even if the class doSomething is in is marked as serializable if any of the fields of it are not Spark's closure cleaner will try and pick up the full thing. – Holden May 28 '15 at 06:06
  • sorry, instead of a class, i'm using an object and the main method is there. should I create a class and make my object a companion object? – sophie May 28 '15 at 07:15
  • I mean if it's in an object it shouldn't be capturing any extra bits, so I think I would need see the code to say where it might be picking up a non serializable reference. – Holden May 28 '15 at 07:29
  • hi Holden, I figured the case class was the culprit. But I don't know how to make it Serializable – sophie May 30 '15 at 07:54
  • @sophie case classes are inherently serializable – Vijay Krishna Aug 09 '17 at 18:53
0

It was the dateFormatter, I placed it inside the partition loop and it works now.

usersRDD.foreachPartition(part => {
    val id = userRow.id
    val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    val date1 = dateFormatter.parseDateTime(userRow.date1)
})
sophie
  • 991
  • 2
  • 15
  • 34
  • 2
    Consider creating expensive objects once per partition rather than every iteration. http://stackoverflow.com/questions/35018033/spark-on-java-what-is-the-right-way-to-have-a-static-object-on-all-workers/35040994#35040994 – Alex Naspo Apr 11 '16 at 17:13