1

I am using Spark to get data from kafka and insert it into Cassandra. My program is

public static void fetchAndValidateData() {
    SparkConf sparkConf = new SparkConf().setAppName("name")
            .set("spark.cassandra.connection.host", "127.0.0.1")
            .set("spark.cleaner.ttl", "3600");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "127.0.0.1");
    kafkaParams.put("group.id", App.GROUP);
    JavaPairReceiverInputDStream<String, EventLog> messages =
            KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
                    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
    JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
        @Override
        public EventLog call(Tuple2<String, EventLog> tuple2) {
            return tuple2._2();
        }
    });
    lines.foreachRDD(rdd -> { javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra(); });
    jssc.start();
    try {
        jssc.awaitTermination();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    jssc.stop();
    jssc.close();
}

My spark-submit command is C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT.jar

My POM file is

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.jtv</groupId>
  <artifactId>spark.atnt</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark.atnt</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <build>
    <plugins>  
       <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>single</goal>
                  </goals>
              </execution>
          </executions>
          <configuration>
              <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
          </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.11</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>javax.json</groupId>
        <artifactId>javax.json-api</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>    
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

I am getting java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil error.

How do I solve it?


Edit:

I figured out what is causing the problem. It is org.apache.kafka:kafka_2.10:0.8.0. When I add provided to it, I get the Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (default) on project spark.atnt: Failed to create assembly: Failed to resolve dependencies for project: com.jtv:spark.atnt:jar:0.0.1-SNAPSHOT: Could not transfer artifact com.sun.jdmk:jmxtools:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory error on my mvn package command and when I remove it, I get java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil error on my spark-submit command.

khateeb
  • 5,265
  • 15
  • 58
  • 114

1 Answers1

0

The easiest way to solve this problem is to package the Cassandra library within your jar file.

In order to do this you can use the maven-assembly-plugin in your pom.xml:

       <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>

This plugin will package all of your dependencies with your jar file. If you want to prevent some dependencies from being packaged (e.g. spark) you need to add the tag <scope>provided</scope>. For example:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
        <scope>provided</scope>
    </dependency>

Please note that if you use the assembly plugin as described above you will obtain two jar files in your target folder. If you want to use the full jar you will need to run: C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Edo
  • 16
  • 3
  • I tried your solution. On running `mvn package`, I am getting `Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (default) on project spark.atnt: Failed to create assembly: Failed to resolve dependencies for project: com.jtv:spark.atnt:jar:0.0.1-SNAPSHOT: Could not transfer artifact com.sun.jmx:jmxri:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory` error. – khateeb Aug 25 '16 at 13:07
  • It looks like you might have a problem similar to this stackoverflow question: http://stackoverflow.com/questions/4908651/the-following-artifacts-could-not-be-resolved-javax-jmsjmsjar1-1 – Edo Aug 25 '16 at 16:00
  • Remember to set `provided` for the dependecies you don't need in your target jar file. e.g. anything in the group `org.apache.spark` – Edo Aug 25 '16 at 16:03
  • I figured out what is causing the problem. It is `org.apache.kafka:kafka_2.10:0.8.0`. When I add `provided` to it, I get the `Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory` and when I remove it, I get `java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil`. – khateeb Aug 26 '16 at 09:50