0

I'm working with kafka-sparkstreaming, java. zookeer: v3.3.6, kafka: v2.11-0.10.1.0, spark: v2.0.1-bin-hadoop2.7, scala: v2.11.8.

There were no configuration's done apart from renaming few files in conf directory.

Code to read from kafka:

JavaPairInputDStream<String, String> kafkaStream =
                KafkaUtils.createDirectStream(jssc, 
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet);

        JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            public String call(Tuple2<String, String> message) {
                return message._2();
            }
        });

        data.foreachRDD(new Function<JavaRDD<String>, Void>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            public Void call(JavaRDD<String> data) throws Exception {

                if(data != null){
                    List<String> result = data.collect();

                    for (String jString : result) {
                        System.out.println("========> " + jString);
                    }
                }else {
                    System.out.println("Got no data in this window");
                }
                return null;
            }
        });

pom.xml:

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>1.5.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>


        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>1.5.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.json/json -->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20160810</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 -->
        <dependency>
            <groupId>org.json4s</groupId>
            <artifactId>json4s-ast_2.11</artifactId>
            <version>3.2.11</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.2.0</version>
        </dependency>


        <!--<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> 
            <version>0.10.0-beta1</version> </dependency> <dependency> <groupId>org.apache.solr</groupId> 
            <artifactId>solr-solrj</artifactId> <version>5.3.1</version> </dependency> -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>

The above piece of code was successful in Windows. But, in Linux i'm getting the following error:

[INFO] main 2016-11-23 11:46:16,985 VerifiableProperties info - Verifying properties
[INFO] main 2016-11-23 11:46:16,987 VerifiableProperties info - Property group.id is overridden to
[INFO] main 2016-11-23 11:46:16,987 VerifiableProperties info - Property zookeeper.connect is overridden to
[DEBUG] main 2016-11-23 11:46:17,046 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,049 BlockingChannel debug - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
[DEBUG] main 2016-11-23 11:46:17,079 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,084 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,084 BlockingChannel debug - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
[DEBUG] main 2016-11-23 11:46:17,086 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,094 SimpleConsumer debug - Disconnecting from mc4:9092
[DEBUG] main 2016-11-23 11:46:17,095 BlockingChannel debug - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
[DEBUG] main 2016-11-23 11:46:17,106 SimpleConsumer debug - Disconnecting from mc4:9092
[DEBUG] main 2016-11-23 11:46:17,286 ClosureCleaner logDebug - +++ Cleaning closure <function1> (org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1) +++
[DEBUG] main 2016-11-23 11:46:17,302 ClosureCleaner logDebug -  + declared fields: 2
[DEBUG] main 2016-11-23 11:46:17,303 ClosureCleaner logDebug -      public static final long org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.serialVersionUID
[DEBUG] main 2016-11-23 11:46:17,303 ClosureCleaner logDebug -      private final org.apache.spark.api.java.function.Function org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
[DEBUG] main 2016-11-23 11:46:17,303 ClosureCleaner logDebug -  + declared methods: 1
[DEBUG] main 2016-11-23 11:46:17,304 ClosureCleaner logDebug -      public final java.lang.Object org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(java.lang.Object)
[DEBUG] main 2016-11-23 11:46:17,304 ClosureCleaner logDebug -  + inner classes: 0
[DEBUG] main 2016-11-23 11:46:17,305 ClosureCleaner logDebug -  + outer classes: 0
[DEBUG] main 2016-11-23 11:46:17,306 ClosureCleaner logDebug -  + outer objects: 0
[DEBUG] main 2016-11-23 11:46:17,308 ClosureCleaner logDebug -  + populating accessed fields because this is the starting closure
[DEBUG] main 2016-11-23 11:46:17,311 ClosureCleaner logDebug -  + fields accessed by starting closure: 0
[DEBUG] main 2016-11-23 11:46:17,312 ClosureCleaner logDebug -  + there are no enclosing objects!
[DEBUG] main 2016-11-23 11:46:17,313 ClosureCleaner logDebug -  +++ closure <function1> (org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1) is now cleaned +++
[ERROR] main 2016-11-23 11:46:17,320 root flush - Exception in thread "main"
[ERROR] main 2016-11-23 11:46:17,320 root flush - java.lang.NoSuchMethodError: org.apache.spark.streaming.api.java.JavaDStream.foreachRDD(Lorg/apache/spark/api/java/function/Function;)V
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at org.labs.spark.streaming.StreamingKafkaConsumer.run(StreamingKafkaConsumer.java:87)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at org.labs.spark.streaming.StreamingApi.main(StreamingApi.java:35)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at java.lang.reflect.Method.invoke(Method.java:498)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
[ERROR] main 2016-11-23 11:46:17,320 root flush -       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
[ERROR] main 2016-11-23 11:46:17,321 root flush -       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[INFO] Thread-1 2016-11-23 11:46:17,327 SparkContext logInfo - Invoking stop() from shutdown hook

I have tried changing kafka version to 2.10-0.10.1.0 and 2.9.2-0.8.1.1 too. But couldn't get through.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
srikanth
  • 958
  • 16
  • 37
  • Dependencies order should be in order.. you can see example of http://stackoverflow.com/questions/40756938/geting-error-noclassdeffounderror-org-apache-spark-internal-logging-on-kafka-sp – Vimal Dhaduk Nov 25 '16 at 05:17

1 Answers1

0

This issue is because of version mismatch in pom.xml

you are using spark version : 2.0.1

but in pom.xml, you have added spark version for :1.5

replace that with,

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.1</version>
</dependency>

and try again

hope this helps

Nirmal Ram
  • 1,180
  • 2
  • 9
  • 18