I am trying to receive very big message with spark from kafka. But it seems that spark have a limit for the size of the message that can be read. I have changed in kafka config to be able to consume and send big message but this is not enough (I think this is related to spark not to kafka) because when using kafka.consumer script I don't have any problem displaying the content of the message.
Maybe this is related to spark.streaming.kafka.consumer.cache.maxCapacity
but I don't know how to set it in a spark java based program.
Thank you.
Update
I am using this to connect to Kafka normally args[0] is zookeeper address and the args[1] is the groupID.
if (args.length < 4) {
System.err.println("Usage: Stream Car data <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("stream cars data");
final JavaSparkContext jSC = new JavaSparkContext(sparkConf);
// Creer le contexte avec une taille de batch de 2 secondes
JavaStreamingContext jssc = new JavaStreamingContext(jSC,new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> data = messages.map(Tuple2::_2);
and this is the error that I get
18/04/13 17:20:33 WARN scheduler.ReceiverTracker: Error reported by receiver for stream 0: Error handling message; exiting - kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic Hello-Kafka partition 0 at fetch offset 3008. Increase the fetch size, or decrease the maximum message size the broker will allow.
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:133)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)