1

I am trying to get employeeId from employee_table and use this id to query employee_address table to fetch the address.

There is nothing wrong with tables. But when I run the below code, I get org.apache.spark.SparkException: Task not serializable

I think I know the issue. The issue is sparkContext is with master and not with worker. But I don't know how to get my head around this.

val employeeRDDRdd = sc.cassandraTable("local_keyspace", "employee_table")


try {

  val data = employeeRDDRdd
    .map(row => {
      row.getStringOption("employeeID") match {
        case Some(s) if (s != null) && s.nonEmpty => s
        case None => ""
      }
    })

    //create tuple of employee id and address. Filtering out cases when  for an employee address is empty.

  val id = data
    .map(s => (s,getID(s)))
    filter(tups => tups._2.nonEmpty)

    //printing out total size of rdd.
    println(id.count())




} catch {
  case e: Exception => e.printStackTrace()
}

def getID(employeeID: String): String = {
  val addressRDD = sc.cassandraTable("local_keyspace", "employee_address")
  val data = addressRDD.map(row => row.getStringOption("address") match {
    case Some(s) if (s != null) && s.nonEmpty => s
    case None => ""
  })
  data.collect()(0)
}

Exception ==>

rg.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)
Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
Ankur
  • 417
  • 1
  • 4
  • 22
  • 1
    How does your outer classes look like, i.e. the one that contains this code and Main method (in case they are different)? One of the possibilities is that you are trying to call `SparkContext` from the Worker node and that's not working. Try reading this: http://stackoverflow.com/questions/31448069/sparkcontext-not-serializable-inside-a-companion-object and possibly this: http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou – Nikola Stojiljkovic Dec 14 '16 at 11:05
  • I think you are correct. But how do I get it fixed? How can I get data from one table, use the data to query another table? – Ankur Dec 14 '16 at 11:58
  • You shouldn't be using `SparkContext` to read the data from the table in the worker nodes. Since you don't want that data to be `RDD` anyway, you should use Java's Cassandra API to do that. However, it doesn't sound like you should be doing that with spark. You should read the data you need as two separate RDDs and call the `join()` method if you want to join them. Also, your code above is not using `employeeID` in the `getID` method at all, so I am not quite sure what is the goal there? – Nikola Stojiljkovic Dec 14 '16 at 12:09

1 Answers1

4

Serialization Error Caused by SparkContext Captured in Lambda

The serialization issue is caused by

val addressRDD = sc.cassandraTable("local_keyspace", "employee_address")

This portion is used inside of a serialized lambda here :

val id = data
  .map(s => (s,getID(s)))

All RDD transformations represent remotely executed code which means their entire contents must be serializable.

The Spark Context is not serializable but it is necessary for "getIDs" to work so there is an exception. The basic rule is you cannot touch the SparkContext within any RDD transformation.

If you are actually trying to join with data in cassandra you have a few options.

If you are just pulling rows based on Partition Key

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable

If you are trying to join on some other field

Load both RDDs seperately and do a Spark Join

val leftrdd = sc.cassandraTable(test, table1)
val rightrdd = sc.cassandraTable(test, table2)
leftrdd.join(rightRdd)
Community
  • 1
  • 1
RussS
  • 16,476
  • 1
  • 34
  • 62