0

I am trying to receive very big message with spark from kafka. But it seems that spark have a limit for the size of the message that can be read. I have changed in kafka config to be able to consume and send big message but this is not enough (I think this is related to spark not to kafka) because when using kafka.consumer script I don't have any problem displaying the content of the message.

Maybe this is related to spark.streaming.kafka.consumer.cache.maxCapacity but I don't know how to set it in a spark java based program.

Thank you.

Update

I am using this to connect to Kafka normally args[0] is zookeeper address and the args[1] is the groupID.

if (args.length < 4) {
        System.err.println("Usage: Stream Car data <zkQuorum> <group> <topics> <numThreads>");
        System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("stream cars data");

    final JavaSparkContext jSC = new JavaSparkContext(sparkConf);
    // Creer le contexte avec une taille de batch de 2 secondes
    JavaStreamingContext jssc = new JavaStreamingContext(jSC,new Duration(2000));

    int numThreads = Integer.parseInt(args[3]);

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = args[2].split(",");

    for (String topic: topics) {
        topicMap.put(topic, numThreads);
    }

    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

    JavaDStream<String> data = messages.map(Tuple2::_2);

and this is the error that I get

18/04/13 17:20:33 WARN scheduler.ReceiverTracker: Error reported by receiver for stream 0: Error handling message; exiting - kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic Hello-Kafka partition 0 at fetch offset 3008. Increase the fetch size, or decrease the maximum message size the broker will allow.
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:133)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Mohamed Amine Ouali
  • 575
  • 1
  • 8
  • 23
  • Could you please provide any error logs that helps you claim that "spark have a limit for the size of the message that can be read"? Also, are you using spark-streaming or structured streaming to ingest data from Kafka? – void Apr 13 '18 at 19:09
  • what do you mean by spark-streaming or structured streaming – Mohamed Amine Ouali Apr 13 '18 at 19:44
  • https://www.quora.com/What-are-the-differences-between-Spark-streaming-and-Spark-structured-streaming-Is-Spark-structured-streaming-the-future-of-Spark-streaming – void Apr 13 '18 at 19:51
  • I have posted the code can you figure it out from there. thanks for helping me – Mohamed Amine Ouali Apr 13 '18 at 19:54
  • Please check if the solution solves the issue for you! – void Apr 13 '18 at 20:04

2 Answers2

0

Depending on the version of Kafka you are using, you need to set the following consumer config in the consumer.properties file available (or to be created) in Kafka config files.

for version 0.8.X or below.

fetch.message.max.bytes

for Kafka version 0.9.0 or above, set

fetch.max.bytes 

to appropriate values based on your application.

Eg. fetch.max.bytes=10485760

Refer this and this.

void
  • 2,403
  • 6
  • 28
  • 53
  • I have set it. but I think that file is not considered in the config – Mohamed Amine Ouali Apr 13 '18 at 20:36
  • according to this, this files are just examples https://stackoverflow.com/questions/48321503/use-of-producer-properties-and-consumer-properties-file-in-apache-kafka?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa – Mohamed Amine Ouali Apr 13 '18 at 20:54
0

So I have found a solution to my problem, In fact as I said in the comment the files in the config file are just examples and they aren't taken into consideration when stating the server. So all the configuration of consumer including fetch.message.max.bytes need to be done in the consumer code.

And this is how I did it:

if (args.length < 4) {
        System.err.println("Usage: Stream Car data <zkQuorum> <group> <topics> <numThreads>");
        System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("stream cars data");

    final JavaSparkContext jSC = new JavaSparkContext(sparkConf);
    // Creer le contexte avec une taille de batch de 2 secondes
    JavaStreamingContext jssc = new JavaStreamingContext(jSC,new Duration(2000));

    int numThreads = Integer.parseInt(args[3]);

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = args[2].split(",");

    for (String topic: topics) {
        topicMap.put(topic, numThreads);
    }



    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", args[0]);
    kafkaParams.put("group.id", args[1]);
    kafkaParams.put("zookeeper.connect", args[0]);
    kafkaParams.put("fetch.message.max.bytes", "1100000000");



    JavaPairReceiverInputDStream<String, String> messages=KafkaUtils.createStream(jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicMap,MEMORY_ONLY() );


    JavaDStream<String> data = messages.map(Tuple2::_2);
Mohamed Amine Ouali
  • 575
  • 1
  • 8
  • 23