I've some classpath issues when running a Spark Streaming job with Kafka 0.10 in yarn client mode:
java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition
I already read the (very well explained) questions Add Jars to a Spark Job and Spark Kafka Streaming issues, but I'm still stucked...
My project is a Java Spring Boot managed with Maven, that includes a scala part for the Spark stuff. An uber jar is created with the Maven Shade plugin, and the spark-streaming-kafka-0.10_2.11 dependency containing the missing class is inside.
Executing the spark app in local mode works as expected. Nevertheless, executing it in yarn client mode, KafkaRDDPartition is missing, I suppose from the executors classpath.
I tried to add the extraClassPath property to my SparkConf but it didn't change anything (Don't hit me for the hard-coded paths, it's just a test)
val conf: SparkConf = new SparkConf()
conf.set("spark.streaming.concurrentJobs", "2")
conf.set("spark.executor.extraClassPath",sparkHome + "/kafka-0.10")
conf.setAppName(classOf[KafkaSubscriber].getSimpleName)
conf.setMaster(sparkMaster)
conf.setSparkHome(sparkHome)
Adding the jars to Spark Context before creating the SparkStreamingContext didn't change anything either
val folder: File = new File(sparkHome + "/kafka-0.10");
if (folder.exists && folder.isDirectory) {
for (f <- folder.listFiles.filter(_.isFile).toSeq) {
spark.sparkContext.addJar(f.getAbsolutePath);
}
}
I'm using a Cloudera platform with the Spark2 parcels deployed and activated accross the whole cluster, ad the Spark2 service is configured to use Kafka 0.10 by default.
Another important thing, as it's a Spring Boot app, I'm not executing my spark app with spark-submit, but executing this
java -Dloader.path=file:///etc/hadoop/conf,myApp.jar -jar myApp.jar
What am I missing?
The Spark code is like this
class KafkaSubscriber(sparkMaster: String, duration: Duration, topicSession: String, brokers: String) {
val sparkHome: String = "/opt/cloudera/parcels/SPARK2/lib/spark"
// Create Spark Conf
val conf: SparkConf = new SparkConf()
conf.set("spark.streaming.concurrentJobs", "2")
conf.set("spark.executor.extraClassPath",sparkHome + "/kafka-0.10")
conf.setAppName(classOf[KafkaSubscriber].getSimpleName)
conf.setMaster(sparkMaster)
conf.setSparkHome(sparkHome)
// Create Spark Session
// **********
val spark: SparkSession = SparkSession.builder()
.appName(classOf[KafkaSubscriber].getSimpleName)
.master(sparkMaster)
.config(conf)
.getOrCreate()
// Set the Kafka dependencies
val folder: File = new File(sparkHome + "/kafka-0.10");
if (folder.exists && folder.isDirectory) {
for (f <- folder.listFiles.filter(_.isFile).toSeq) {
spark.sparkContext.addJar(f.getAbsolutePath);
}
}
// Create Spark Streaming Context
// **********
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(() => new StreamingContext(spark.sparkContext, duration))
def subscribe {
// Some code here ...
// Subscribe to Kafka
// **********
val topicSetSession: Array[String] = topicSession.split(",")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "KafkaSubscriber",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
// Get Session Stream
val rawSessionStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSetSession, kafkaParams))
// Some code here ...
}
/**
* Start the Spark ETL
*/
def startEtl {
// Start the Spark Streaming batch
ssc.start()
ssc.awaitTermination()
}
/**
* Close the spark session
*/
def close {
ssc.stop(true)
spark.close
}
And the Java part to invoke the Spark code
@Component
@Scope("singleton")
public class KafkaStreaming implements InitializingBean {
@Autowired
private KafkaSubscriber kafkaSubscriber;
@Override
public void afterPropertiesSet() throws Exception {
// Get and store the HbbTV Config
kafkaSubscriber.subscribe();
// Start Spark
kafkaSubscriber.startEtl();
}
}
Thanks for any help!