I've a following code :-
case class event(imei: String, date: String, gpsdt: String, entrygpsdt: String,lastgpsdt: String)
object recalculate extends Serializable {
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("RecalculateOdo")
.set("spark.cassandra.connection.host", "192.168.0.78")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.cassandraTable("db", "table").select("imei", "date", "gpsdt").where("imei=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2), entry(3))
var lastgpsdt = "2018-04-06 10:10:10"
rdd.foreach(f =>
{
val imei = f.get[String]("imei")
val date = f.get[String]("date")
val gpsdt = f.get[String]("gpsdt")
val now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())
val collection = sc.parallelize(Seq(event(imei, date, gpsdt,now,lastgpsdt)))
collection.saveToCassandra("db", "table", SomeColumns("imei", "date", "gpsdt", "entrygpsdt","lastgpsdt")
lastgpsdt = gpsdt
})
}
}
Whenever I'm trying to run the code , getting Task serializable error :-
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
Suggestions Please, Thanks,