30

I'm using 0.9.0.0 version of Kafka and I want to count the number of messages in a topic without using the admin script kafka-console-consumer.sh.

I have tried all the commands in the answer Java, How to get number of messages in a topic in apache kafka but none are yielding the result. Can anyone help me out here?

moffeltje
  • 4,521
  • 4
  • 33
  • 57
jack AKA karthik
  • 885
  • 3
  • 15
  • 30

7 Answers7

71

You could try to execute the command below:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1

Then, sum up all the counts for each partition.

Updated: Java implementation

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("your_topic"));
    Set<TopicPartition> assignment;
    while ((assignment = consumer.assignment()).isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
    }
    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
    final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
    assert (endOffsets.size() == beginningOffsets.size());
    assert (endOffsets.keySet().equals(beginningOffsets.keySet()));

    Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
            TopicPartition tp = entry.getKey();
            Long beginningOffset = entry.getValue();
            Long endOffset = endOffsets.get(tp);
            return endOffset - beginningOffset;
        }).sum();
    System.out.println(totalCount);
}
amethystic
  • 6,821
  • 23
  • 25
11

Technically speaking you can simply consume all messages from the topic and count them:

Example:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*

However kafka.tools.GetOffsetShell approach will give you the offsets and not the actual number of messages in the topic. It means if the topic gets compacted you will get two differed numbers if you count messages by consuming them or by reading offsets.

Topic compaction: https://kafka.apache.org/documentation.html#design_compactionbasics

Smittey
  • 2,475
  • 10
  • 28
  • 35
Cosmos
  • 171
  • 2
  • 3
  • 2
    Reading off potentially untold (millions?) of messages off a topic in Kafka (which are persistent until purged - not like JMS - persistent until read) is not viable unless time is not relative. – Darrell Teague May 03 '17 at 15:20
  • which count could be potentially higher, the offset number or the number of messages consumed? I guess the first? – Aydin K. Apr 06 '18 at 11:46
6

you can sum up all counts by using this :

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
Quentin Geff
  • 819
  • 1
  • 6
  • 21
  • 4
    Thanks! A bit simpler summing: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_CLUSTER_HOSTS --topic $TOPIC_NAME --time -1 | tr ":" " " | awk '{ sum += $3 } END { print sum }' – ozma Oct 09 '17 at 08:35
  • 2
    @ozma instead of `tr` you can also use `awk -F:` :D – Dmitry Minkovsky Jan 13 '18 at 16:44
2

If you don't want to buy into the hassle around the "original" Kafka scripts, there's also kafkacat.

The basic idea is to

  • consume the last message of each partition and
  • add up the offsets (correcting for zero-based offsets).

Let's develop this.

kafkacat -C -b <broker> -t <topic> -o -1 -f '%p\t%o\n'

This will output something like this (plus "reached end of partition" notices on stderr):

0    77
1    75
2    78

Now, kafkacat doesn't terminate but keeps waiting for new messages. We can circumvent this by adding a timeout (choose a value large enough so you get all partitions in your given environment):

timeout --preserve-status 1 kafkacat <snip>

Now we could go ahead and add up the second column (+1 each) -- but if there are new messages during that timeout interval, we might get something like this:

0    77
1    75
2    78
1    76

So we have to account for this, which is easy enough to do with a little awk:

timeout --preserve-status 1 kafkacat <snip> 2> /dev/null \
| awk '{lastOffsets[$1] = $2} END {count = 0; for (i in lastOffsets) { count += lastOffsets[i] + 1 }; print count}'

Note how we use a (hash)map to remember the last seen offset for each partition until the timeout triggers, and then loop over the array to compute the sum.

Raphael
  • 9,779
  • 5
  • 63
  • 94
2

We can use kafkacat command to count the number of messages in a topic. Command is as follows. Note that this command will also work even if your messages are multiline.

kafkacat -b <broker_1_ip:port>,<broker_2_ip:port> -t <topic-name> -C -e -q -f 'Offset: %o\n' | wc -l

Subtract 1 from the number printed on the console and that's answer.

Hammad Akhtar
  • 177
  • 12
1

You can also do this using awk and a simple loop

for i in `kafka-run-class kafka.tools.GetOffsetShell --broker-list broker:9092 --time -1 --topic topic_name| awk -F : '{print $3}'`; do sum=$(($sum+$i)); done
0

To get the number of records in topic

brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
spats
  • 805
  • 1
  • 10
  • 12