1

I'm running a streaming job in Spark 2, CDH 5.9 using Kafka client 0.8. The simple aim is to persist the information in Impala, record by record.

I can't get rid of this error, since I don't know from where it's coming from:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2

The Direct Kafka Stream is created simply by

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
  "group.id" -> "myconsumergroup",
  "auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream  = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)

And processed by:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()

directKafkaStream.foreachRDD { rdd =>
  val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]

  val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")

    deviceEnriched.show(false)
    spark.sql("use my_database")
      deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}

streamingContext.start()
streamingContext.awaitTermination()
Carlos Delgado
  • 552
  • 7
  • 23

1 Answers1

4

Short answer: the messages were serialized with a version of commons-lang3 JAR that is not compatible with the JAR you are using with Spark.

Long answer: if you had just Googled that error message, then searched the Apache Commons source code, you would have found...

  • this post that digs into the Java "class incompatible" serialization issue, in general
  • the source code for FastDateFormat stating that serialVersionUID = 1L until V3.1 but switching to serialVersionUID = 2L with V3.2 (because the binary structure has changed at that time)

By the way, I just checked and CDH 5.9 ships with commons-lang3 in V3.1 (for Hive, Impala, Sentry, Hive-in-Oozie, Sqoop-in-Oozie) and V3.3.2 (for Spark-in-Oozie) and V3.4 (for Sqoop) while Spark itself is not supposed to need it at all. Go figure.
And since CDH does not ship with Spark 2 yet, I guess you either downloaded the "beta" parcel or the Apache version -- and I checked, the Apache version (V2.0.2) ships with commons-lang3 V3.3.2

My 2 cents: just force --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar in your Spark 2 command line, and see if that's enough to solve your issue.

Edit  For 2 extra cents, make sure that your "custom" JAR gets precedence over whatever JAR was already in the YARN Classpath, with --conf spark.yarn.user.classpath.first=true

Community
  • 1
  • 1
Samson Scharfrichter
  • 8,884
  • 1
  • 17
  • 36
  • Thanks Samson. That solved the issue :) BTW, the parcel Spark 2 was released GA this week from Cloudera, and comes with **V3.3.2**. As you correctly said: Go figure. My underlying problem was that I couldn't figure out which object was being serialized and from where to where, but forcing v3.1 the way you pointed solved the problem. – Carlos Delgado Dec 16 '16 at 09:38
  • ... for a while. The Exception is back again, and doesn't matter whether including **V3.1** or **V3.3.2**, the exception is always the same and in the same node (Im running this on three nodes). So I'm thinking that it could be something related to Spark but not related to my job? Any other ideas? – Carlos Delgado Dec 16 '16 at 11:35
  • Stopping that node solves the problem, so I guess there's a stale configuration in that node somewhere. Is there any way to refresh it? As it's a VM, I'm tempted in creating it again from scratch – Carlos Delgado Dec 16 '16 at 11:58
  • 1
    Try `--conf spark.yarn.user.classpath.first=true` so that Spark activates the YARN property -- otherwise the actual order of JARs in the CLASSPATH might be random *(it's "false" by default on YARN side; I thought Spark forced it to "true" by default, because it causes so many problems, but...)* – Samson Scharfrichter Dec 16 '16 at 12:28
  • Yey.. thanks, sloved again, hope this time for good. definitely I want to include this parameter in all my jobs. – Carlos Delgado Dec 16 '16 at 12:42