1

I am trying to connect Spark Structured Streaming with kafka and it throws the below error:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at ...

enter image description here

Based on the documentation I have added the required dependencies

and my kafka and zookeeper servers are running. Not sure what the issue is. Also, I am using it this way

import spark.implicits._
val feedback =spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:2181").option("subscribe", "kafka_input_topic")
      .load().as[InputMessage].filter(_.lang.equals("en"))

Any help is appreciated. Thank you

zero323
  • 322,348
  • 103
  • 959
  • 935
Rahul Kumar
  • 87
  • 1
  • 12
  • Do you have `"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % SPARK_VERSION` added to your dependency list? (SPARK_VERSION is a placeholder for your proper spark version). – Yuval Itzchakov Feb 11 '18 at 11:52
  • yes. @YuvalItzchakov ` org.apache.spark spark-sql-kafka-0-10_2.11 2.2.0 provided ` – Rahul Kumar Feb 11 '18 at 11:55

4 Answers4

4

The problem, as you mentioned in your comments, is this:

<scope>provided</scope>

Remove the provided scope for sql-kafka, as it is not provided by the Spark installation.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
1

you could use the kafka data source by the fully-qualified name (not the alias) as follows:

spark.readStream.format("org.apache.spark.sql.kafka010.KafkaSourceProvider").load
1

The issue is that the necessary jar is not included in CLASSPATH at runtime (not build time).

Based on the documentation you linked to you added the required dependencies to your build definition file (pom.xml or build.sbt or build.gradle), but the exception happens while you try to run the application which is after it is built, doesn't it?

What you miss is that part of the documentation about deployment, i.e. Deploying:

As with any Spark applications, spark-submit is used to launch your application. spark-sql-kafka-0-10_2.11 and its dependencies can be directly added to spark-submit using --packages, such as,

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ..

You have to add this --packages or you'd have to create an uber-jar that would make the dependency part of your jar file.

Community
  • 1
  • 1
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
0

If using maven then the following way of building jar with dependencies might solve your issue.

Add the spark dependencies like below:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.1</version>
        <scope>${spark.scope}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.1</version>
    </dependency>

Then configure your maven profiles as below:

<profiles>
    <profile>
        <id>default</id>
        <properties>
            <profile.id>dev</profile.id>
            <spark.scope>compile</spark.scope>
        </properties>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
    </profile>
    <profile>
        <id>test</id>
        <properties>
            <profile.id>test</profile.id>
            <spark.scope>provided</spark.scope>
        </properties>
    </profile>
    <profile>
        <id>online</id>
        <properties>
            <profile.id>online</profile.id>
            <spark.scope>provided</spark.scope>
        </properties>
    </profile>
</profiles>

Add the followign plugin:

<plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- bind to the packaging phase -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

Then buld your jar using mvn clean install -Ponline -DskipTests. This should solve your issue

Akhil Bojedla
  • 1,968
  • 12
  • 19