I am querying kafka broker using the admin client API to get the committed offsets of CONSUMER_GROUP using the below code:
Map<TopicPartition, OffsetAndMetadata> offsets =
admin.listConsumerGroupOffsets(CONSUMER_GROUP)
.partitionsToOffsetAndMetadata().get();
The above code will trigger a query to a special created __consumer_offsets topic to get the committed offsets for each of the partition of the topic(s)-partition that CONSUMER_GROUP is responsible for.
On the other hand, I am using the below code to retrieve the latest (end) offsets for each of the topic(s) partition of CONSUMER_GROUP
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
long latestOffset = latestOffsets.get(e.getKey()).offset();
My question is that the committed and latest offsets are hence queried/requested from two different topics. The committed offset are requested from the __consumer_offsets topic, and the latest (end) offsets are requested from the actual topic(s) of the CONSUMER_GROUP.
(1) is the above description about requesting committed and latest offsets accurate?
(2) is it possible to query __consumer_offsets topic directly?
Thank you.