I wish to count the number of records in a given Kafka topic to set up a monitoring endpoint as a kind of 'sanity check' for data-in vs. data-out.
I noticed that doing this with confluent-kafka 1.9.2 produces results that differ from those produced by e.g. ksqldb.
The source topic is partitioned, has infinite retention and is not compacted.
When trying to process the number of records with confluent-kafka 1.9.2
:
from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": BROKER_URI, "group.id": GROUP_ID, "isolation.level": "read_committed"})
def seek_to_offset(consumer: Consumer, partitions: List[TopicPartition]):
for current_partition partitions:
current_partition.offset = -2 # earliest
consumer.assign(partitions)
def get_topic_size(topic_name: str = DEFAULT_TOPIC):
consumer.subscribe([topic_name], on_assign=seek_to_offset)
total_record_count = 0
while True:
record = consumer.poll(30.0) # default session timeout
if record is None:
log.debug('No more records')
break
if record.error():
log.error(f'Consumer error: {record.error()}')
continue
total_record_count += 1
consumer.close()
return total_record_count
log.info(f'Number of records: {get_topic_size()}')
Number of records: 61316573
The result is 61316573
.
On the other hand, I have a confluentinc/ksqldb-server:0.28.2
together with confluentinc/ksqldb-cli:0.28.2
running that is supposed to also tally the number of records for a similar check. I do that as follows:
- I set the ksql consumer isolation level to
read_committed
so that I only get committed messages and the processing guarantee toexactly_once
. Prior to running the below queries, I setauto.offset.reset
toearliest
:
# As environment variables in my ksqldb service via docker-compose
---
version: '2'
services:
primary-ksqldb-server:
image: confluentinc/ksqldb-server:0.28.2
hostname: primary-ksqldb-server
container_name: primary-ksqldb-server
environment:
# ...
KSQL_KSQL_STREAMS_PROCESSING_GUARANTEE: exactly_once
KSQL_KSQL_STREAMS_CONSUMER_ISOLATION_LEVEL: read_committed
This is the recommended way of passing this configuration when starting via docker according to the documentation.
Lastly, inside ksqlDB
:
ksql> set 'auto.offset.reset' = 'earliest'
- I created a stream based off my topic.
ksql> create stream mytopic with (kafka_topic='mytopic', key_format='KAFKA', value_format='AVRO');
- I created a table to sum up the number of records based on a timestamp field in the record (
creationdate
):
ksql> create table mytopic_count_per_date with (kafka_topic='mytopic_count_per_date', partitions=18, replicas=1) AS
select count(*) num_records, TIMESTAMPTOSTRING(mytopic.creationdate, 'yyyy-MM-dd') date
from mytopic mytopic
group by TIMESTAMPTOSTRING(mytopic.creationdate, 'yyyy-MM-dd')
emit changes;
The output of this table looks as follows:
ksql> select * from mytopic_count_per_date;
+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+
|DATE |NUM_RECORDS |
+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+
|2022-04-04 |2260363 |
|2022-04-06 |2407432 |
|2022-04-27 |915754 |
|2021-12-22 |6 |
|2021-12-31 |16 |
...
- Lastly, after waiting for the above table to be fully up-to-date and not have any consumer lag, I created another table that sums up these grouped per-date values:
ksql> create table mytopic_total with (kafka_topic='mytopic_total', partitions=18, replicas=1) AS
select sum(mytopic_count_per_date.num_records) total_records, 'x' X
from mytopic_count_per_date mytopic_count_per_date
GROUP BY 'x'
EMIT CHANGES;
Aggregate functions for pull-queries are unfortunately not supported, hence the need to do it like this.
I expected the total_records
column in my mytopic_total
table to contain the exact same value that I got using confluent-kafka
above. However, I get a different value:
ksql> select * from mytopic_total;
+--------------------------------------+--------------------------------------+
|X |TOTAL_RECORDS |
+--------------------------------------+--------------------------------------+
|x |61316958 |
Since I can't tell the faulty source apart with just two data sources, I tried a third approach using kcat
(note that the librdkafka
default isolation level is also read_committed
):
$ docker run edenhill/kcat:1.7.1 -C -b broker:9092 -o beginning -f '%o\n' -u -t mytopic -e -G kcat-console mytopic | wc -l
61316573
Thus it seems like the result of confluent-kafka
i.e. Python is correct, and ksqldb
is not.
The versions I am running are:
confluent-kafka: '1.9.2', 17367552
/librdkafka: '1.9.2', 17367807
ksqldb-server: 0.28.2
/ksqldb-cli: 0.28.2
kcat: 1.7.1
/librdkafka: '1.8.2'
(according to GitHub)
My questions are:
- Why is the
ksqldb
value different? Are there other configuration options that I have to enable inksqldb
? Is this a bug? - In step 4 above I waited for the first table to be fully-up-to-date before trying to sum up the grouped terms. Is this always necessary? I saw that the result differed when I did not do this.