-2

I've a following code :-

case class event(imei: String, date: String, gpsdt: String,  entrygpsdt: String,lastgpsdt: String)

object recalculate extends Serializable {
def main(args: Array[String]) {
  val conf = new SparkConf()
  .setMaster("local[2]")
  .setAppName("RecalculateOdo")
  .set("spark.cassandra.connection.host", "192.168.0.78")
  .set("spark.cassandra.connection.keep_alive_ms", "20000")

 val sc = SparkContext.getOrCreate(conf)

 val rdd = sc.cassandraTable("db", "table").select("imei", "date", "gpsdt").where("imei=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2), entry(3))
var lastgpsdt = "2018-04-06 10:10:10"
 rdd.foreach(f => 
      {

      val imei = f.get[String]("imei")
      val date = f.get[String]("date")
      val gpsdt = f.get[String]("gpsdt")
      val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
      val collection = sc.parallelize(Seq(event(imei, date, gpsdt,now,lastgpsdt)))
      collection.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "entrygpsdt","lastgpsdt")
      lastgpsdt = gpsdt
      })
 }
}

Whenever I'm trying to run the code , getting Task serializable error :-

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

Suggestions Please, Thanks,

Erick Ramirez
  • 13,964
  • 1
  • 18
  • 23
jAi
  • 115
  • 1
  • 14
  • Where have you declared sparkContext variable? This issue seems to be because of "event". – Shrinivas Deshmukh Apr 10 '18 at 06:49
  • @ShrinivasDeshmukh - please recheck , I've edited question with sc details. – jAi Apr 10 '18 at 06:51
  • What changes should I make in "event" case class to get it resolved ? – jAi Apr 10 '18 at 06:54
  • If I skip event class and write val collection = sc.parallelize(Seq(imei, date, gpsdt,now)) , Then also it gives same error – jAi Apr 10 '18 at 06:55
  • https://stackoverflow.com/questions/35018033/spark-on-java-what-is-the-right-way-to-have-a-static-object-on-all-workers/35040994#35040994 This link might be useful. – Shrinivas Deshmukh Apr 10 '18 at 07:04
  • As pointed out in the answer below, sparkContext is not serializable as it is to be used only by the driver node and not the worker nodes. – Shrinivas Deshmukh Apr 10 '18 at 07:05
  • 1
    The link I shared has explaining as to how to declare and use objects on all worker nodes, please have a look.! – Shrinivas Deshmukh Apr 10 '18 at 07:07
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/168616/discussion-between-aditya-jain-and-shrinivas-deshmukh). – jAi Apr 10 '18 at 07:25

1 Answers1

2

SparkContext is not serializable. You should access it from the driver itself. instead of rdd.foreach use rdd.map and return event(imei, date, gpsdt,now).
Then save this result to Cassandra. Something like:

val eventsRdd = rdd.map { f => 
  val imei = f.get[String]("imei")
  val date = f.get[String]("date")
  val gpsdt = f.get[String]("gpsdt")
  val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
  event(imei, date, gpsdt,now)
}
eventsRdd.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "entrygpsdt"))

On another note, if you have a lot of events, I would consider not creating a date formatter and calculating the current time for each event. You can do this once, before you start the calculation (or at least once per partition - see mapPartitions).

dvir
  • 2,546
  • 2
  • 18
  • 15
  • Hi, it worked but actually if you check my question again I do have some field which is to be saved after creation of event to how will I do that , in your answer , event statement should be last one but I want to store lastgpsdt which is to be send in next entry. – jAi Apr 10 '18 at 07:15
  • You edited it after I posted the answer :) What does `lastgpsdt` actually represents? I am afraid that if you do what you are trying above you will not get the result you expect due to concurrency issues. However, if you want the max gpsdt, you can project eventsRdd to give you the gpsdt and take the max value: So it should be something like `eventsRdd.map(_. gpsdt).max()` – dvir Apr 10 '18 at 07:28
  • So, so let me explain you what I'm trying to do . I'll just iterate through Cassandra rows and save the gpsdt of current row into next row which will be represented as gpsdt of previous row with name "lastgpsdt".. are you clear with my query ? – jAi Apr 10 '18 at 07:30
  • So , basically each row will have 2 column - "gpsdt" that is time of that entry and "lastgpsdt" which is nothing but actually the "gpsdt" of previous row – jAi Apr 10 '18 at 07:32
  • So , my current idea is to create rdd and save it to cassandra and then store gpsdt into "lastgpsdt" variable which is to be used in next loop – jAi Apr 10 '18 at 07:33