3

I am trying to read from Kafka. My issue is exactly similar to the one discussed here. My code and pom.xml is very similar too. For me, the code runs perfectly fine in my local machine. But the same code fails when I build Uber jar and run it in Dataproc cluster. I have tried all the suggestions discussed on the original post. But none of them helps.

I get the below error:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide"

Can someone please help?

Spark Version: 2.4.8 Scala Version: 2.12

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>*****</groupId>
    <artifactId>*****</artifactId>
    <version>****</version>
    <name>${project.artifactId}</name>


    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>


        <!-- Scala Version-->
        <scala.version>2.12.10</scala.version>
        <scala.major.version>2.12</scala.major.version>

        <!-- Spark/Hadoop Version-->
        <spark.version>2.4.8</spark.version>


        <!-- Open Source Dependencies -->
        <google.cloud.libraries.bom.version>16.2.0</google.cloud.libraries.bom.version>
        <cloud.bigdataoss.gcs.connector.version>hadoop2-2.1.3</cloud.bigdataoss.gcs.connector.version>
        <google.spark.bigquery.version>0.19.1</google.spark.bigquery.version>
        <commons.io.version>2.4</commons.io.version>
        <protobuf.java.version>3.14.0</protobuf.java.version>
        <com.google.guava.version>30.1-jre</com.google.guava.version>
        <google-cloud-storage.version>1.79.0</google-cloud-storage.version>

        <!-- Plugin Versions-->
        <maven.compiler.version>3.8.1</maven.compiler.version>
        <maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version>
        <maven.release.plugin.version>2.5.3</maven.release.plugin.version>
        <maven.site.plugin.version>3.9.1</maven.site.plugin.version>
        <scalatest.maven.plugin.version>1.0</scalatest.maven.plugin.version>
        <maven.source.plugin.version>3.2.0</maven.source.plugin.version>

        

    <dependencyManagement>
        <!-- reference : https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google-Cloud-Platform-Libraries-BOM -->
        <dependencies>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>libraries-bom</artifactId>
                <version>${google.cloud.libraries.bom.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.protobuf</groupId>
                    <artifactId>protobuf-java</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.major.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging -->
        <dependency>
            <groupId>com.typesafe.scala-logging</groupId>
            <artifactId>scala-logging_2.12</artifactId>
            <version>3.9.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.holdenkarau/spark-testing-base -->
        <dependency>
            <groupId>com.holdenkarau</groupId>
            <artifactId>spark-testing-base_2.12</artifactId>
            <version>2.4.7_1.1.1</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-secretmanager -->
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-secretmanager</artifactId>
            <!--<version>2.0.4</version>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.github.mrpowers/spark-fast-tests -->
        <dependency>
            <groupId>com.github.mrpowers</groupId>
            <artifactId>spark-fast-tests_2.12</artifactId>
            <version>1.2.0</version>
            <scope>test</scope>
        </dependency>


       
        <!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-oauth2-http -->

        <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>[${protobuf.java.version}]</version>
            <scope>provided</scope>
        </dependency>


        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
            <version>[${com.google.guava.version}]</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>${google-cloud-storage.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>gcs-connector</artifactId>
            <version>${cloud.bigdataoss.gcs.connector.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.spark</groupId>
            <artifactId>spark-bigquery-with-dependencies_${scala.major.version}</artifactId>
            <version>0.22.2</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>


    <!--      Added to enable jar creation using mvn command-->
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>

                            <artifactSet>
                                <excludes>
                                    <exclude>org.scalactic:*</exclude>
                                    <exclude>org.scalatest:*</exclude>
                                    <exclude>org.apache.spark:*</exclude>
                                </excludes>
                            </artifactSet>

                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.org.mainClass</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                    <resource>META-INF/services</resource>
                                    <file>io.grpc.LoadBalancerProvider</file>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                    <resource>META-INF/services</resource>
                                    <file>io.grpc.NameResolverProvider</file>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>

                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
<!--                                        <exclude>META-INF/maven/**</exclude>-->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>**/logback.xml</exclude>
                                        <exclude>**/log4j.properties</exclude>
                                        <exclude>module-info.class</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <relocations>
                                <relocation>
                                    <pattern>com</pattern>
                                    <shadedPattern>repackaged.com</shadedPattern>
                                    <includes>
                                        <include>com.microsoft.**</include>
                                        <include>com.google.common.**</include>
                                        <include>com.google.protobuf.**</include>
                                    </includes>
                                </relocation>
                            </relocations>

                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <version>${maven.source.plugin.version}</version>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.5.6</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <packaging>jar</packaging>


</project>
  • If you run [`jar tf`](https://docs.oracle.com/javase/tutorial/deployment/jar/view.html) on the shaded JAR, which Kafka packages do you see? – OneCricketeer Jul 12 '22 at 19:41
  • Those are classes from `kafka-clients` dependency. Search for `kafka010`. The class you want is called `org.apache.spark.sql.kafka010.KafkaSourceProvider` – OneCricketeer Jul 12 '22 at 20:14
  • 1
    No I could not find anything for kafka010. I even tried to searching just "KafkaSourceProvider". Is not available in the list. – Harikrishnan Balachandran Jul 12 '22 at 20:25
  • Then that's the problem Spark is reporting. And you're saying passing `--packages org.apache.spark.spark-sql-kafka-0-10_2.12:2.4.8` didn't work either? – OneCricketeer Jul 12 '22 at 20:53
  • Try removing `org.apache.spark:*` – OneCricketeer Jul 13 '22 at 17:08
  • It didn't work. Still the required class is not found in Uber jar. Also I am facing the same "failed to find data source" for bigquery. I had to pass --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.22.2 to resolve. Is there something wrong in the way i am building Uber jar? Can you please help if you could? – Harikrishnan Balachandran Jul 13 '22 at 17:46
  • 1
    I typically use the [`maven-assembly-plugin`'s `jar-with-dependencies`](https://stackoverflow.com/a/574650/2308683) builder, not the shade plugin. – OneCricketeer Jul 13 '22 at 17:47
  • I had to use shade plugin since I encountered few Class not found errors. And it worked after I relocated them using the shade plugin. Is there a way to use both maven-assembly-plugin and shade plugin in same pom? Sorry if my questions are naive. This is my first project in scala. – Harikrishnan Balachandran Jul 13 '22 at 17:51
  • 1
    You're effectively still getting "class not found" with that error from Spark. Scala isn't the issue here. I don't really have any other suggestions other than continue to use `--packages` for any external Spark dependencies you need. – OneCricketeer Jul 13 '22 at 18:15
  • 1
    One alternative, though, would be to add `spark.jars.packages` to your SparkSession config options – OneCricketeer Jul 13 '22 at 18:17

0 Answers0