11

I have just began with Spark Streaming and I am trying to build a sample application that counts words from a Kafka stream. Although it compiles with sbt package, when I run it, I get NoClassDefFoundError. This post seems to have the same problem, but the solution is for Maven and I have not been able to reproduce it with sbt.

KafkaApp.scala:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaApp {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "zookeeper.connection.timeout.ms" -> "10000",
        "group.id" -> "sparkGroup"
    )

    val topics = Map(
        "test" -> 1
    )

    // stream of (topic, ImpressionLog)
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
    println(s"Number of words: %{messages.count()}")
  }
}

build.sbt:

name := "Simple Project"

version := "1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1",
    "org.apache.spark" %% "spark-streaming" % "1.1.1",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

And I submit it with:

bin/spark-submit \
  --class "KafkaApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.1.jar

Error:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.5.252:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
    at KafkaApp$.main(KafkaApp.scala:28)
    at KafkaApp.main(KafkaApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
Community
  • 1
  • 1
kahlo
  • 2,314
  • 3
  • 28
  • 37

9 Answers9

16

spark-submit does not automatically put the package containing KafkaUtils. You need to have in your project JAR. For that you need to create an all inclusive uber-jar, using sbt assembly. Here is an example build.sbt .

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

You obviously also need to add the assembly plugin to SBT.

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

Tathagata Das
  • 1,808
  • 15
  • 13
  • I am also getting same issue while I am using Maven. After that I included "org.apache.maven.plugins" in my pom.xml but the issue is unsolved. Any other parameter I have to check? – Shrinivas Kulkarni Oct 01 '15 at 11:18
  • with the change, if I run stb package, I got error. : error: not found: object AssemblyKeys import AssemblyKeys._ ^ [error] Type error in expression – johnsam Mar 26 '16 at 17:38
  • @johnsam Just leave away the first import line and the "assemblySettings" line, works for me. – jurgispods Aug 04 '16 at 13:48
7

Please try by including all dependency jars while submitting application:

./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar

Anirudh Sharma
  • 7,968
  • 13
  • 40
  • 42
Sandeep
  • 81
  • 1
  • 3
1

Following build.sbt worked for me. It requires you to also put the sbt-assembly plugin in a file under the projects/ directory.

build.sbt

name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",         // kafka
  "org.apache.hbase" % "hbase" % "0.92.1",
  "org.apache.hadoop" % "hadoop-core" % "1.0.2",
  "org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
)

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}

project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

Vibhuti
  • 1,584
  • 16
  • 19
0

meet the same problem, I solved it by build the jar with dependencies.

add the code below to pom.xml

<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <testSourceDirectory>src/test/java</testSourceDirectory>
    <plugins>
      <!--
                   Bind the maven-assembly-plugin to the package phase
        this will create a jar file without the storm dependencies
        suitable for deployment to a cluster.
       -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass></mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
</build>    

mvn package submit the "example-jar-with-dependencies.jar"

Nilesh
  • 2,054
  • 3
  • 23
  • 43
0

Added the Dependency externally, project-->properties--> java Build Path-->Libraries--> add External jars and add the required jar.

this solved my issue.

Suresh
  • 38,717
  • 16
  • 62
  • 66
0

Using Spark 1.6 do the job for me without the hassle of handling so many external jars... Can get quite complicate to manage...

Gi1ber7
  • 632
  • 1
  • 11
  • 22
0

You could also download the jar file and put it in the Spark lib folder, because it is not installed with Spark, instead of beating your head trying to bet SBT build.sbt to work.

http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.10/2.1.1/spark-streaming-kafka-0-10_2.10-2.1.1.jar

copy it to:

/usr/local/spark/spark-2.1.0-bin-hadoop2.6/jars/

Walker Rowe
  • 953
  • 1
  • 12
  • 24
0

use --packages argument on spark-submit, it takes mvn package in the format group:artifact:version,...

Joseph Thomas
  • 484
  • 1
  • 5
  • 11
-1
import org.apache.spark.streaming.kafka.KafkaUtils

use the below in build.sbt


name := "kafka"

version := "0.1"

scalaVersion := "2.11.12"

retrieveManaged := true

fork := true

//libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
//libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"

//libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0" % "provided"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8-assembly
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8-assembly" % "2.2.0"

This will fix the issue

Brian Tompsett - 汤莱恩
  • 5,753
  • 72
  • 57
  • 129