3

I wish to count the number of records in a given Kafka topic to set up a monitoring endpoint as a kind of 'sanity check' for data-in vs. data-out.

I noticed that doing this with confluent-kafka 1.9.2 produces results that differ from those produced by e.g. ksqldb.

The source topic is partitioned, has infinite retention and is not compacted.

When trying to process the number of records with confluent-kafka 1.9.2:

from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer, TopicPartition

consumer = Consumer({"bootstrap.servers": BROKER_URI, "group.id": GROUP_ID, "isolation.level": "read_committed"})

def seek_to_offset(consumer: Consumer, partitions: List[TopicPartition]):
    for current_partition partitions:
        current_partition.offset = -2  # earliest

    consumer.assign(partitions)


def get_topic_size(topic_name: str = DEFAULT_TOPIC):
    consumer.subscribe([topic_name], on_assign=seek_to_offset)
    total_record_count = 0
    while True:
        record = consumer.poll(30.0)  # default session timeout
        if record is None:
            log.debug('No more records')
            break
        if record.error():
            log.error(f'Consumer error: {record.error()}')
            continue

        total_record_count += 1

    consumer.close()
    return total_record_count

log.info(f'Number of records: {get_topic_size()}')

Number of records: 61316573 

The result is 61316573.

On the other hand, I have a confluentinc/ksqldb-server:0.28.2 together with confluentinc/ksqldb-cli:0.28.2 running that is supposed to also tally the number of records for a similar check. I do that as follows:

  1. I set the ksql consumer isolation level to read_committed so that I only get committed messages and the processing guarantee to exactly_once. Prior to running the below queries, I set auto.offset.reset to earliest:
# As environment variables in my ksqldb service via docker-compose
---
version: '2'
services:
  primary-ksqldb-server:
    image: confluentinc/ksqldb-server:0.28.2
    hostname: primary-ksqldb-server
    container_name: primary-ksqldb-server
    environment:
      # ... 
      KSQL_KSQL_STREAMS_PROCESSING_GUARANTEE: exactly_once
      KSQL_KSQL_STREAMS_CONSUMER_ISOLATION_LEVEL: read_committed

This is the recommended way of passing this configuration when starting via docker according to the documentation.

Lastly, inside ksqlDB:

ksql> set 'auto.offset.reset' = 'earliest'
  1. I created a stream based off my topic.
ksql> create stream mytopic with (kafka_topic='mytopic', key_format='KAFKA', value_format='AVRO');
  1. I created a table to sum up the number of records based on a timestamp field in the record (creationdate):
ksql> create table mytopic_count_per_date with (kafka_topic='mytopic_count_per_date', partitions=18, replicas=1) AS 
    select count(*) num_records, TIMESTAMPTOSTRING(mytopic.creationdate, 'yyyy-MM-dd') date
    from mytopic mytopic
    group by TIMESTAMPTOSTRING(mytopic.creationdate, 'yyyy-MM-dd')
    emit changes;

The output of this table looks as follows:

ksql> select * from mytopic_count_per_date;
+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+
|DATE                                                                           |NUM_RECORDS                                                                    |
+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+
|2022-04-04                                                                     |2260363                                                                        |
|2022-04-06                                                                     |2407432                                                                        |
|2022-04-27                                                                     |915754                                                                         |
|2021-12-22                                                                     |6                                                                              |
|2021-12-31                                                                     |16                                                                             |
...
  1. Lastly, after waiting for the above table to be fully up-to-date and not have any consumer lag, I created another table that sums up these grouped per-date values:
ksql> create table mytopic_total with (kafka_topic='mytopic_total', partitions=18, replicas=1) AS
    select sum(mytopic_count_per_date.num_records) total_records, 'x' X
    from mytopic_count_per_date mytopic_count_per_date
    GROUP BY 'x'
    EMIT CHANGES;

Aggregate functions for pull-queries are unfortunately not supported, hence the need to do it like this.

I expected the total_records column in my mytopic_total table to contain the exact same value that I got using confluent-kafka above. However, I get a different value:

ksql> select * from mytopic_total;
+--------------------------------------+--------------------------------------+
|X                                     |TOTAL_RECORDS                         |
+--------------------------------------+--------------------------------------+
|x                                     |61316958                              |

Since I can't tell the faulty source apart with just two data sources, I tried a third approach using kcat (note that the librdkafka default isolation level is also read_committed):

$ docker run edenhill/kcat:1.7.1 -C -b broker:9092 -o beginning -f '%o\n' -u -t mytopic -e -G kcat-console mytopic | wc -l
61316573

Thus it seems like the result of confluent-kafka i.e. Python is correct, and ksqldb is not.

The versions I am running are:

  • confluent-kafka: '1.9.2', 17367552 / librdkafka: '1.9.2', 17367807
  • ksqldb-server: 0.28.2 / ksqldb-cli: 0.28.2
  • kcat: 1.7.1 / librdkafka: '1.8.2' (according to GitHub)

My questions are:

  1. Why is the ksqldb value different? Are there other configuration options that I have to enable in ksqldb? Is this a bug?
  2. In step 4 above I waited for the first table to be fully-up-to-date before trying to sum up the grouped terms. Is this always necessary? I saw that the result differed when I did not do this.
filpa
  • 3,651
  • 8
  • 52
  • 91
  • _set the ksql consumer isolation level to read_committed so that I only get committed messages and the processing guarantee to exactly_once_ - What exact properties did you set? Also, keep in mind, that records could actively be removed from compaction or retention, so you should not "count records" very frequently. Your "sanity check" should rather monitor JMX ingest rate to the topic/broker without actually consuming records to do so... – OneCricketeer Oct 14 '22 at 22:16
  • I added some clarification to the question w.r.t. your remarks. The source topic has infinite retention without compaction, so compaction/deletion should not play a role here. Re: *should rather monitor JMX ingest rate*: I was under the impression that the only way to get a precise reading on the actual number of records in a topic written to by a transactional producer, is to actually consume and count the records. Is this not the case? I'm all for a simpler solution, if there is one... – filpa Oct 17 '22 at 16:04
  • To add on the above: the plan is to use the data contained in the topic for reporting/aggregations of invoice data (when/what/how much) later on, and so the requirement is for this data to be verifiably exact. The sanity check described above was the simplest way I could find to *repeatably* and *verifiably* check that the data is complete. – filpa Oct 17 '22 at 16:17
  • Okay, well, in my experience, counting of consumed records will be slower-and-slower over time using any client (let's assume your consume restarts). I think it'd be better to dump the topic to a database or file storage, then count from there. – OneCricketeer Oct 17 '22 at 19:13
  • I think we're misunderstanding each other. I don't restart counting the records. The consumers may restart, sure, but the idea was to still compare with a pre-computed tally from the source topic after (re-)ingestion was done. – filpa Oct 17 '22 at 23:21
  • *I think it'd be better to [...]* That's kind of what I was going for in using ksqlDB - keep a table of the current tallies using a push query and compare vs. transferred data to e.g. AWS. On the AWS side we could easily query the count using e.g. Athena, but that still doesn't help me in ascertaining the number of records that *should* have been transferred. And dumping the topic into another non-streaming DB kind of defeats the purpose of using Kafka in the first place, I would think... – filpa Oct 17 '22 at 23:22
  • 1
    There are streaming databases, but I guess you are creating a stateful stream rather than always counting from the beginning, so assuming the `group by` works as expected (what if `creationdate` is null?), then I suggest opening a Github issue in KSQL – OneCricketeer Oct 18 '22 at 04:46
  • 1
    Yes, I am creating a stateful stream and not recounting past records everytime. Re: `creationdate` - the record field is fortunately guaranteed to be non-null. Either way, thanks for the suggestion. For the record, I've gone ahead and heeded it ([ksql#9650](https://github.com/confluentinc/ksql/issues/9650)). I'll update this post if/when I learn something new there. – filpa Oct 19 '22 at 16:41
  • One thing I noticed - `exactly_once` config option... Are you setting this anywhere in librdkafka configs? Or maybe this is the default? Basically, seems to me that KSQL is not skipping over some transaction markers, perhaps, and there is a different config needed for that, so it is counting extra/duplicate records. Also, you should try `v2`? https://docs.ksqldb.io/en/latest/operate-and-deploy/exactly-once-semantics/ – OneCricketeer Oct 19 '22 at 22:46

0 Answers0