2

I am developing a Spark application that listens to a Kafka stream using Spark and Java.

I use kafka_2.10-0.10.2.1.

I have set various parameters for Kafka properties: bootstrap.servers, key.deserializer, value.deserializer, etc.

My application compiles fine, but when I submit it, it fails with the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer

I do use StringDeserializer for key.deserializer and value.deserializer so it's indeed related to how I wrote my application.

Various maven dependencies used in pom.xml:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

I have tried updating the version of spark streaming/kafka. I could not find much anywhere.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
mohiitg
  • 113
  • 1
  • 2
  • 7

4 Answers4

3

As you mentioned in the comment above:

Turned out issue was with uber jar not building correctly.

That's exactly the issue. It does relate to how you assemble your Spark application and I'm worried to think you may have chosen a uber jar way. It's in my opinion a waste of your time at assemble and spark-submit time.

I'd personally prefer using --packages command-line option that takes care of pulling down all the necessary dependencies if needed.

$ ./bin/spark-submit --help
...
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
...

That makes your life as a Spark developer easier and it's no longer you to wait till maven/sbt downloads the dependencies and assemble them together. It's done at spark-submit time (and perhaps it's someone else's job, too! :))

You should spark-submit as follows:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.1 ...

The reason for this extra requirement is that spark-streaming-kafka-0-10 module is not included by default in Spark's CLASSPATH (as it's considered unnecessary most of the time). By doing the above --packages command line you trigger loading the module (with its transitive dependencies).

You should not bundle the module in your Spark application's uber jar.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
1

spark-streaming_2.10

This is dependent upon Scala 2.10

Your other dependencies are using Scala 2.11

Upgrading the version is the correct solution for the current error.

And make sure that within streaming-kafka-0-10, this matches the version of Kafka you're running

Application is compiling fine but when I am trying to submit the spark job, its showing error: Exception in thread "main" java.lang.NoClassDefFoundError:

By default, maven does not include dependency jars when it builds a target

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I tried changing all the versions to 2.10 as well as 2.11 (keeping all other dependencies either 2.10 or 2.11) but still error persists. Can there be any other issue also? – mohiitg Jun 25 '17 at 17:55
  • If the code compiles, but fails at runtime, the classpath is wrong, and you're not building an uber jar when you submit the job – OneCricketeer Jun 25 '17 at 18:00
1

Turned out issue was with uber jar not building correctly. In case you would like to assemble the application and package an uber jar.

Create an assembly file in src/assembly/assembly.xml

<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.0 http://maven.apache.org/xsd/assembly-2.1.0.xsd">
    <id>bin</id>
    <formats>
        <format>jar</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <dependencySets>
        <dependencySet>
            <unpack>true</unpack>
            <scope>provided</scope>
        </dependencySet>
    </dependencySets>
</assembly>

And add the maven-assembly-plugin to the pom.xml


        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/assembly/assembly.xml</descriptor>
                    </descriptors>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

If you would like to add a dependency to the uber jar, just add provided scope to it.
In your case, it will be like this:


    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
        <scope>provided</scope>
    </dependency>
$spark-submit --class Main application-bin.jar
Gara Walid
  • 415
  • 4
  • 5
0

You can use shade plugin to generate fat jars if that is prefered to you wrt. what Jacek proposed via --packages approach.

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

You can also use maven-dependency plugin to fetch some of the dependencies and put it in your assembly in lib directory and latter supply it to spark.

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.2</version>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>initialize</phase>
                        <goals>
                            <goal>copy</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <artifactItems>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-core</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-core.jar</destFileName>
                        </artifactItem>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-api</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-api.jar</destFileName>
                        </artifactItem>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-1.2-api</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-1.2-api.jar</destFileName>
                        </artifactItem>
                        <artifactItem>
                            <groupId>org.apache.logging.log4j</groupId>
                            <artifactId>log4j-slf4j-impl</artifactId>
                            <version>${log4j2.version}</version>
                            <type>jar</type>
                            <overWrite>true</overWrite>
                            <outputDirectory>${project.build.directory}/log4j-v2-jars</outputDirectory>
                            <destFileName>log4j-v2-slf4j-impl.jar</destFileName>
                        </artifactItem>
                    </artifactItems>
                    <outputDirectory>${project.build.directory}/wars</outputDirectory>
                    <overWriteReleases>false</overWriteReleases>
                    <overWriteSnapshots>true</overWriteSnapshots>
                </configuration>
            </plugin>

The reason why I am proposing this is because maybe in your case (as it was the case with my work) your cluster is behind a very strict firewall and spark is not allowed to talk to nexus for resolving packages at submit step. In that case you really do need to handle this at artefact preparation and either of these might help you.

In my example with maven-dependency I fetch log4jv2 to pass it to spark 2.3 in order to have log4j-v2 log outputs (you can place your dependencies instead).

milos
  • 261
  • 1
  • 6