6

As said in the title, i want to get a number of record in my topic and i can't find a solution using kafka-python library. Does anyone have any idea ?

LilyAZ
  • 133
  • 3
  • 10
  • 1
    Does this answer your question? [Counting Number of messages stored in a kafka topic](https://stackoverflow.com/questions/41792703/counting-number-of-messages-stored-in-a-kafka-topic) – Michael Heil Oct 07 '20 at 15:55
  • Yes @mike. this solution is very similar to it. I'll try to reproduce it in Python (i'm not very good at Java) – LilyAZ Oct 08 '20 at 08:20
  • 1
    i can't use the command line. In my project, i need to use Python Kafka API – LilyAZ Oct 08 '20 at 08:23

4 Answers4

3

The main idea is to count how many messages there are in each partition of the topic and sum all these numbers. The result is the total number of messages on that topic. I am using confluent_kafka as the main library.

from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor

consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})

def get_partition_size(topic_name: str, partition_key: int):
    topic_partition = TopicPartition(topic_name, partition_key)
    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
    partition_size = high_offset - low_offset
    return partition_size

def get_topic_size(topic_name: str):
    topic = consumer.list_topics(topic=topic_name)
    partitions = topic.topics[topic_name].partitions
    workers, max_workers = [], len(partitions) or 1

    with ThreadPoolExecutor(max_workers=max_workers) as e:
        for partition_key in list(topic.topics[topic_name].partitions.keys()):
            job = e.submit(get_partition_size, topic_name, partition_key)
            workers.append(job)

    topic_size = sum([w.result() for w in workers])
    return topic_size

print(get_topic_size('my.kafka.topic'))
Dorcioman
  • 499
  • 5
  • 6
1

There is no specific API to count the number of records from a topic. You need to consume and count the number of records that you received from kafka consumer.

Steephen
  • 14,645
  • 7
  • 40
  • 47
1

One solution is you can add one message each to all the partition and get the last offset. From offsets you can calculate the number of total message sent till now to the topic.

But this is not the right approach. You are not aware about how many messages consumers have already consumed and how many messages have been deleted by kafka. The only way is you can consume messages and count the number.

Ravi
  • 73
  • 5
  • When i consume message in my topic, it return only one message. Do you know how can i consume all messages ? My consumer function : `bootstrap_servers = bootstrap_server_list.split(",") consumer_kafka = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', consumer_timeout_ms=10000) consumer_kafka.subscribe([topic]) for message in consumer_kafka: if len(message) > 0: print(message)` – LilyAZ Oct 08 '20 at 08:48
  • I am not sure how to consume messages in kafka-python. But I tried running - `from kafka import KafkaConsumer bootstrap_servers = ['localhost:9092'] consumer_kafka = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest',consumer_timeout_ms=10000) consumer_kafka.subscribe(['test']) for message in consumer_kafka: if len(message) > 0: print(message)` And it is working fine and consuming all the messages. You can look into this code and I hope it will help. – Ravi Oct 08 '20 at 10:19
  • This is actually going in an infinite loop how do we come out of that?? – sandeep P Nov 10 '20 at 11:06
0

I wasn't able to get this working with kafka-python, but I was able to do it fairly easily with confluent-kafka libraries:

from confluent_kafka import Consumer

topic = "test_topic"
broker = "localhost:9092"

def get_count():
    consumer = Consumer({
        'bootstrap.servers': broker,
        'group.id': 'my-group',
        'auto.offset.reset': 'earliest',
    })

    consumer.subscribe([topic])

    total_message_count = 0
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            print("No more messages")
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        total_message_count = total_message_count + 1
        print('Received message {}: {}'.format(total_message_count,     
msg.value().decode('utf-8')))

    consumer.close()

    print(total_message_count)
Simon Tower
  • 664
  • 3
  • 11
  • 27