0

I've been trying to submit a Spark Streaming application using spark-submit to a cluster of mine consisting of a master and two worker nodes. The application has been written in Scala, and build using Maven. Importantly, the Maven build is configured to produce a fat JAR containing all dependencies. Furthermore, the JAR has been distributed to all of nodes. The streaming job has been submitted using the following command:

bin/spark-submit --class topology.SimpleProcessingTopology --jars /tmp/spark_streaming-1.0-SNAPSHOT.jar --master spark://10.0.0.8:7077 --verbose /tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp/streaming-benchmark.properties 

where 10.0.0.8 is the IP address of the master node within the VNET.

However, I keep getting the following exception while starting the streaming application:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)

Caused by: java.lang.ClassNotFoundException: topology.SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)

I've checked the content of the JAR using jar tvf and as you can see in the output below, it does contain the class in question.

 1735 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1.class
   702 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology.class
  2415 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.class
  2500 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1.class
  7045 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$.class

This exception has been caused due to the anonymous function of the foreachPartition call:

rdd.foreachPartition(partition => {
      val outTopic = props.getString("application.simple.kafka.out.topic")
      val producer = new KafkaProducer[Array[Byte],Array[Byte]](kafkaParams)
      partition.foreach(record => {
        val producerRecord = new ProducerRecord[Array[Byte], Array[Byte]](outTopic, record.key(), record.value())
        producer.send(producerRecord)
      })
      producer.close()
    })

Unfortunately, I am not able to find the root cause of this since so far. Hence, I would appreciate if anyone could help me out fixing this issue.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
dsafa
  • 783
  • 2
  • 8
  • 29
  • Are you distributing the JAR properly to all your executor nodes? – Yuval Itzchakov Mar 01 '17 at 13:14
  • @YuvalItzchakov As already mentioned in the question, the JAR is distribution property. I've double checked the existence of the JAR on all of the nodes. – dsafa Mar 01 '17 at 13:16
  • Sorry, I missed that. Did you properly add them to the classpath on the executor via `spark.executor.extraClassPath`? – Yuval Itzchakov Mar 01 '17 at 13:20
  • @YuvalItzchakov Is it really required to explicitly add the class path using the source code while supplying the path using the --jars flag of spark-submit? – dsafa Mar 01 '17 at 13:22
  • Yes. See my answer [here](http://stackoverflow.com/a/37348234/1870803) for the full details. – Yuval Itzchakov Mar 01 '17 at 13:23
  • What about using spark.jars? Because spark.jars applies both to the driver and the executors, hence common sense tells me it should then resolve the issue? – dsafa Mar 01 '17 at 13:25
  • Nope. `spark.jars` is identical to `--jars`. – Yuval Itzchakov Mar 01 '17 at 13:26
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/136963/discussion-between-dsafa-and-yuval-itzchakov). – dsafa Mar 01 '17 at 13:39

0 Answers0