0

I've some classpath issues when running a Spark Streaming job with Kafka 0.10 in yarn client mode:

java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition

I already read the (very well explained) questions Add Jars to a Spark Job and Spark Kafka Streaming issues, but I'm still stucked...

My project is a Java Spring Boot managed with Maven, that includes a scala part for the Spark stuff. An uber jar is created with the Maven Shade plugin, and the spark-streaming-kafka-0.10_2.11 dependency containing the missing class is inside.

Executing the spark app in local mode works as expected. Nevertheless, executing it in yarn client mode, KafkaRDDPartition is missing, I suppose from the executors classpath.

I tried to add the extraClassPath property to my SparkConf but it didn't change anything (Don't hit me for the hard-coded paths, it's just a test)

val conf: SparkConf = new SparkConf()
  conf.set("spark.streaming.concurrentJobs", "2")
  conf.set("spark.executor.extraClassPath",sparkHome + "/kafka-0.10")
  conf.setAppName(classOf[KafkaSubscriber].getSimpleName)
  conf.setMaster(sparkMaster)
  conf.setSparkHome(sparkHome)

Adding the jars to Spark Context before creating the SparkStreamingContext didn't change anything either

val folder: File = new File(sparkHome + "/kafka-0.10");
  if (folder.exists && folder.isDirectory) {
    for (f <- folder.listFiles.filter(_.isFile).toSeq) {
      spark.sparkContext.addJar(f.getAbsolutePath);
    }
  }

I'm using a Cloudera platform with the Spark2 parcels deployed and activated accross the whole cluster, ad the Spark2 service is configured to use Kafka 0.10 by default.

Another important thing, as it's a Spring Boot app, I'm not executing my spark app with spark-submit, but executing this

java -Dloader.path=file:///etc/hadoop/conf,myApp.jar -jar myApp.jar

What am I missing?

The Spark code is like this

class KafkaSubscriber(sparkMaster: String, duration: Duration, topicSession: String, brokers: String) {

  val sparkHome: String = "/opt/cloudera/parcels/SPARK2/lib/spark"

  // Create Spark Conf
  val conf: SparkConf = new SparkConf()
  conf.set("spark.streaming.concurrentJobs", "2")
  conf.set("spark.executor.extraClassPath",sparkHome + "/kafka-0.10")
  conf.setAppName(classOf[KafkaSubscriber].getSimpleName)
  conf.setMaster(sparkMaster)
  conf.setSparkHome(sparkHome)

  // Create Spark Session
  // **********
  val spark: SparkSession = SparkSession.builder()
    .appName(classOf[KafkaSubscriber].getSimpleName)
    .master(sparkMaster)
    .config(conf)
    .getOrCreate()

  // Set the Kafka dependencies
  val folder: File = new File(sparkHome + "/kafka-0.10");
  if (folder.exists && folder.isDirectory) {
    for (f <- folder.listFiles.filter(_.isFile).toSeq) {
      spark.sparkContext.addJar(f.getAbsolutePath);
    }
  }

  // Create Spark Streaming Context
  // **********
  val ssc: StreamingContext = StreamingContext.getActiveOrCreate(() => new StreamingContext(spark.sparkContext, duration))

  def subscribe {

    // Some code here ...

    // Subscribe to Kafka
    // **********
    val topicSetSession: Array[String] = topicSession.split(",")
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "KafkaSubscriber",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    // Get Session Stream
    val rawSessionStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSetSession, kafkaParams))

    // Some code here ...

  }

  /**
   * Start the Spark ETL
   */
  def startEtl {
    // Start the Spark Streaming batch
    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * Close the spark session
   */
  def close {
    ssc.stop(true)
    spark.close
  }

And the Java part to invoke the Spark code

@Component
@Scope("singleton")
public class KafkaStreaming implements InitializingBean {

    @Autowired
    private KafkaSubscriber kafkaSubscriber;

    @Override
    public void afterPropertiesSet() throws Exception {

        // Get and store the HbbTV Config
        kafkaSubscriber.subscribe();

        // Start Spark
        kafkaSubscriber.startEtl();
    }
}

Thanks for any help!

Cheloute
  • 783
  • 2
  • 11
  • 27

1 Answers1

0

Well, My issue was a mistake into Cloudera Manager configuration for Kafka Integration. By default, Kafka 0.9 was selected, including kafka 0.9 jars into classpath instead of kafka 0.10.

Cheloute
  • 783
  • 2
  • 11
  • 27