1

How do count that how many total messages have been sent to the topic in Kafka and how many have been consumed or committed by the consumer at that time?

I am initiatting kafka connector as-

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("mytopic");

JavaPairInputDStream<String, String> directKafkaStream = 
KafkaUtils.createDirectStream(ssc,
   String.class, String.class, StringDecoder.class, StringDecoder.class, 
   kafkaParams, topics);

And processing as -

directKafkaStream.foreachRDD(rdd -> {
 System.out.println("--- New RDD with " + rdd.partitions().size()
        + " partitions and " + rdd.count() + " records");
 rdd.foreach(record -> System.out.println(record._2));
});

For 2 seconds

--- New RDD with 2 partitions and 3 records
value-1
value-0
value-2
--- New RDD with 2 partitions and 7 records
value-3
value-5
value-7
value-9
value-4
value-6
value-8
--- New RDD with 2 partitions and 8 records
value-11
value-10
value-13
...
Ninja
  • 115
  • 1
  • 1
  • 12

0 Answers0