0

I have a simple Spark streaming java program:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class KafkaStreamingExample {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("KafkaStreamingExample").setMaster("local[2]");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("dbserver1.inventory.customers");

        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        stream.foreachRDD(rdd -> {
            rdd.foreach(record -> {
                System.out.println(record.value());
            });
        });

        jssc.start();
        jssc.awaitTermination();
    }
}

This program is compiled against Java 1.8 SDK and is supposed to stream some data coming from a Kafka topic.

I have compiled an Uber JAR, so all dependencies are included.

I am submitting this to my Spark 3.3.1 cluster using this command:

./bin/spark-submit --class KafkaStreamingExample --master yarn --deploy-mode client --executor-memory 2g --num-executors 1 kafka-streaming-example-jar-with-dependencies.jar

I get this error when the job is submitted:

Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.Map$.apply(Lscala/collection/immutable/Seq;)Ljava/lang/Object;
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:68)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:136)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:115)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:156)
    at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at KafkaStreamingExample.main(KafkaStreamingExample.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What should I change to overcome this error?

Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
Eugene Goldberg
  • 14,286
  • 20
  • 94
  • 167
  • 1
    maybe answer is here https://stackoverflow.com/a/67779298/6770614 – Boris Azanov Jan 23 '23 at 19:31
  • `NoSuchMethodError` https://stackoverflow.com/questions/35186/how-do-i-fix-a-nosuchmethoderror https://stackoverflow.com/questions/8168052/java-lang-nosuchmethoderror-when-the-method-definitely-exists https://stackoverflow.com/questions/59706633/nosuchmethoderror-java – Dmytro Mitin Jan 24 '23 at 07:38
  • *"I have compiled an Uber JAR, so all dependencies are included."* How do you manage duplicates? (For example in sbt with sbt-assembly plugin there can be issues if a wrong merge strategy is chosen: https://stackoverflow.com/questions/74800073/ https://stackoverflow.com/questions/74809158/ https://stackoverflow.com/questions/74879217 https://stackoverflow.com/questions/25144484 https://stackoverflow.com/questions/74881933/) – Dmytro Mitin Jan 24 '23 at 07:45
  • Could you show your build file? (`pom.xml`?) – Dmytro Mitin Jan 24 '23 at 07:45

0 Answers0