Assuming I have two topics (both with two partitions and infinite retention):
my_topic_a
my_topic_b
and one consumer group:
my_consumer
At some point, it was consuming both topics, but due to some changes, it's no longer interested in my_topic_a
, so it stopped consuming it and now is accumulating lag:
kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
This lag is annoying me because:
- My consumer-lag graph in Grafana is tainted.
- An automatic alarm is triggered, reminding me about a consumer lagging too much.
Thus I want to get rid of the offsets for my_topic_a
of my_consumer
, to get to a state as if my_consumer
had never consumed my_topic_a
.
The following attempt fails:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
With this output:
The consumer does not support topic-specific offset deletion from a consumer group.
How can I achieve my goal? (Temporarily stopping all consumers of this group would be a feasible option in my use-case.)
(I'm using Kafka version 2.2.0
.)
My guess is, something can be done by writing something to topic __consumer_offsets
, but I don't know what it would be. Currently, this topic looks as follows (again, simplified):
kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)