28

I use HDP-2.6.3.0 with Spark2 package 2.2.0.

I'm trying to write a Kafka consumer, using the Structured Streaming API, but I'm getting the following error after submit the job to the cluster:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more

Following spark-submit command:

$SPARK_HOME/bin/spark-submit \
     ​--master yarn \
​     --deploy-mode client \
​​     --class com.example.KafkaConsumer \​
​     --executor-cores 2 \
​​     --executor-memory 512m \​           
     --driver-memory 512m \​           
     sample-kafka-consumer-0.0.1-SNAPSHOT.jar​

My java code:

package com.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class KafkaConsumer {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                  .builder()
                  .appName("kafkaConsumerApp")
                  .getOrCreate();

        Dataset<Row> ds = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
                  .option("subscribe", "my-topic")
                  .load();
    }
}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>sample-kafka-consumer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

    <dependencies>

        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>


    </dependencies>  


    <repositories>
        <repository>
            <id>local-maven-repo</id>
            <url>file:///${project.basedir}/local-maven-repo</url>
        </repository>
    </repositories> 

    <build>

        <!-- Include resources folder in the .jar -->
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>

        <plugins>

            <!-- Plugin to compile the source. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>       

            <!-- Plugin to include all the dependencies in the .jar and set the main class. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.KafkaConsumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>    
</project>

[UPDATE] UBER-JAR

Below the configuration used in the pom.xml to generate the uber-jar

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.KafkaConsumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
zero323
  • 322,348
  • 103
  • 959
  • 935
Kleyson Rios
  • 2,597
  • 5
  • 40
  • 65
  • It's been a while since I worked with maven (I'm with sbt), but `maven-shade-plugin` is for shading not uber-jar, isn't it? Shouldn't it be `maven-assembly-plugin` with `jar-with-dependencies` configuration? – Jacek Laskowski Dec 30 '17 at 13:25
  • I'm not an expert on the Java world, but from my researches the maven-shade-plugin is the option used to generate the uber-jar file. – Kleyson Rios Dec 30 '17 at 17:15
  • If you are using scala's `build.sbt` and discarding `META-INF` files blindly as part of your `assemblyMergeStrategy` while building the uber jar, that can cause the "kafka" alias to go unregistered. Check this SO answer: https://stackoverflow.com/a/48061746/1628839 – Vivek Sethi Feb 04 '20 at 10:27

8 Answers8

44

kafka data source is an external module and is not available to Spark applications by default.

You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.

(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)

I'd recommend using --packages first and only when it works consider the other options.

Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Include the other command-line options as you wish.

Uber-Jar Approach

Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.

For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).

kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Also, I've just updated the question with the UBER-JAR configuration. If you see any improvement on it, please let me know. – Kleyson Rios Dec 30 '17 at 13:15
  • 3
    I need to correct my answer, even with maven-assembly-plugin couldn't merge files under META-INF/services/ from different jars(dependencies). Though it provides a way to handle this situation with ContainerDescriptionHandler, but didn't work. But could do merge with maven shade plugin, @KleysonRios you only miss below in your maven settigs – Taejun Jang Apr 27 '19 at 15:25
  • 1
    @wyx Can you ask a separate question to help you out with the error? Please post the error, how you bundle the deps and how you execute the app. Ok? – Jacek Laskowski Dec 17 '19 at 09:49
6

The top answer is correct this solved the issue for me:

assemblyMergeStrategy in assembly := {
  case "reference.conf" => MergeStrategy.concat
  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}
  • Welcome to Stack Overflow. Code-only answers are discouraged on Stack Overflow because they don't explain how it solves the problem. Please edit your answer to explain how this answers the question and what and how it improves on the existing answers, so that it is useful to users with similar issues. – FluffyKitten Aug 20 '20 at 09:19
3

Even I had similar issue, issue started when we upgraded the Cloudera-Spark version from 2.2 --> 2.3.

Issue was: my uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister was getting overwritten by similar file which is present in some other jars. Hence it was not able to find the KafkaConsumer entry in 'DataSourceRegister' file.

Resolution: modifying the POM.xml helped me.

<configuration>
  <transformers>
        <transformer
             implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
             <resource>
                   META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
             </resource>
        </transformer>
   </transformers>
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
RajashekharC
  • 164
  • 1
  • 8
2

For uber-jar, adding ServicesResourceTransformer to shade-plugin works for me.

<transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
pppk520
  • 517
  • 4
  • 15
1

My solution was different, I directly specify spark-sql-kafka package on the submit-job command:

.\bin\spark-submit --master local --class "org.myspark.KafkaStream" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  <path_to_jar>

Related: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying

Codigo Morsa
  • 820
  • 7
  • 14
0

I faced the same error. It took me a couple of days to figure out. When you copy dependency from maven repository, in particular "spark-sql-kafka", it contains the line:

<scope> provided </scope> 

The solution was to remove this line so that the dependency would run in the default "compile" scope. The same thing is true if you use SBT. It's probably worthy to remove it for other dependencies as well if they have it, just in case.

George
  • 31
  • 2
  • 2
0

I had the sample problem, but with gradle and shadowJar. It worked after adding:

shadowJar {
    mergeServiceFiles()
}
assemble.dependsOn shadowJar

0

I faced the same error, because i exclude everything under META-INF in shade plugin for fixing the shade overlapping resource warning,

<exclude>META-INF/**</exclude>

but classLoader need resource to know which DataSource is registered. so remove this exclude, it's work fine to me.

 <resource>
      META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 </resource>

hope it could help someone.

Rand Chen
  • 101
  • 4