5

Assuming I have two topics (both with two partitions and infinite retention):

  • my_topic_a
  • my_topic_b

and one consumer group:

  • my_consumer

At some point, it was consuming both topics, but due to some changes, it's no longer interested in my_topic_a, so it stopped consuming it and now is accumulating lag:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -

This lag is annoying me because:

  • My consumer-lag graph in Grafana is tainted.
  • An automatic alarm is triggered, reminding me about a consumer lagging too much.

Thus I want to get rid of the offsets for my_topic_a of my_consumer, to get to a state as if my_consumer had never consumed my_topic_a.

The following attempt fails:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user

With this output:

The consumer does not support topic-specific offset deletion from a consumer group.

How can I achieve my goal? (Temporarily stopping all consumers of this group would be a feasible option in my use-case.)

(I'm using Kafka version 2.2.0.)


My guess is, something can be done by writing something to topic __consumer_offsets, but I don't know what it would be. Currently, this topic looks as follows (again, simplified):

kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
Tobias Hermann
  • 9,936
  • 6
  • 61
  • 134
  • Isn't there a way to filter out data from the Grafana dashboard? Or maybe mute the alerts on topic tags for the duration of the offset retention period? – OneCricketeer Dec 10 '20 at 18:28
  • @OneCricketeer Sure, I likely could find a way to adjust the configurations of all depending things (Prometheus for Grafana, the alert manager, and whatnot) to filter out this stale offset. And then do this every time again some other consumer group stops to consume one of its topics. But I'd highly prefer a cleaner and more conclusive solution. – Tobias Hermann Dec 10 '20 at 20:06
  • @OneCricketeer Also, from my understanding, it would not just be for the offset retention period, because this only applies to consumers that stopped consuming all topics. My cluster has a `offsets.retention.minutes` of one day, and `my_consumer` did not consume `topic_a` for a few weeks now, but since it's still active reading other topics, nothing was removed and the lag is still there. – Tobias Hermann Dec 10 '20 at 20:06
  • That topic is compacted, meaning only closed log segments would get cleaned. Default segment size is 1G, which is a lot of data there since that `OffsetAndMetadata` is a compact binary format. But it also means that, let's say the key of `[my_consumer_group,my_topic_b,0]` is not seen for longer than 1 day, and does not exist in the currently open log segment, it would then get removed. – OneCricketeer Dec 10 '20 at 22:21
  • @OneCricketeer Thanks for the explanation. Since I use `log.cleaner.max.compaction.lag.ms=86400000`, I think even if the log segment does not reach `log.segment.bytes`, `[my_consumer_group,my_topic_a,...]` should be removed after one day. Nevertheless, in reality, in my case, it's still there after multiple weeks have passed. So something seems to be working not as intended in my cluster. – Tobias Hermann Dec 11 '20 at 09:00
  • Did you write the code to generate the lag ? Or is being sourced from burrow or some other tool ? – s7vr Dec 20 '20 at 16:03
  • @s7vr The lag built up naturally because `my_consumer_group` intentionally stopped to consume `my_topic_a`. (In reality, the lag is not such a round number like 100000, but for this example, I simplified things a but, also the number of partitions.) – Tobias Hermann Dec 21 '20 at 06:54

2 Answers2

3

In the meantime (Kafka 2.8) it has become possible with the new --delete-offsets parameter for kafka-consumer-groups.sh. :-)

Tobias Hermann
  • 9,936
  • 6
  • 61
  • 134
0

The output you are given:

"The consumer does not support topic-specific offset deletion from a consumer group."

is an indicator that it is not possible to remove a specific topic from a consumer group.

You could change the consumer group for the new application reading only my_topic_b, restart the application and then remove the old and idle ConsumerGroup completely. With that approach you will be able to track the consumer lags without any distraction and alerts popping up. When restarting the application with a new consumerGroup it is usually best to stop the producer for topic "b" during the restart to make sure you are not missing any messages.

I would really avoid playing around manually with the topic __consumer_offsets.

As an alternative, you could regularly run a command line tool that comes with Kafka to reduce the lag of your ConsumerGroup:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my_consumer --topic my_topic_a --to-latest 

You may need to add the --execute option.

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
  • Yes, if it would be feasible to stop the producer, it would already simply have switched to a new consumer-group name using the way you describe. Sadly, it's not a viable option in my use-case. – Tobias Hermann Dec 14 '20 at 14:11
  • Okay, maybe the alternative I just added to my answer might help you solve your problem. Still, this is another workaround... but I haven't seen any reliable solution to manipulate the consumer_offsets topic in a way you need it. – Michael Heil Dec 15 '20 at 20:41
  • 1
    Thanks, the solution is very pragmatic, but I see three problems with it. First, it adds complexity by having to maintain the scheduling of that bogus script. Second, in a similar case in the future, I'd have to add more such bogus scripts. Third, the graphs will look like the consumer is still consuming this topic, while in fact, it's not, which could lead to drawing false conclusions in the future. – Tobias Hermann Dec 16 '20 at 07:28