I'm running a streaming job in Spark 2, CDH 5.9 using Kafka client 0.8. The simple aim is to persist the information in Impala, record by record.
I can't get rid of this error, since I don't know from where it's coming from:
16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1,
local class serialVersionUID = 2
The Direct Kafka Stream is created simply by
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
"group.id" -> "myconsumergroup",
"auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)
And processed by:
val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()
directKafkaStream.foreachRDD { rdd =>
val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]
val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")
deviceEnriched.show(false)
spark.sql("use my_database")
deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}
streamingContext.start()
streamingContext.awaitTermination()