How do count that how many total messages have been sent to the topic in Kafka and how many have been consumed or committed by the consumer at that time?
I am initiatting kafka connector as-
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("mytopic");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics);
And processing as -
directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
For 2 seconds
--- New RDD with 2 partitions and 3 records
value-1
value-0
value-2
--- New RDD with 2 partitions and 7 records
value-3
value-5
value-7
value-9
value-4
value-6
value-8
--- New RDD with 2 partitions and 8 records
value-11
value-10
value-13
...