3

I have an issue where my commit fails because poll() is too long (why this happens I don't know, there was no messages and it was simply read/committing on an empty queue, and my poll-interval is set to hours). Then when it hits read() again it doesn't rebalance for some reason. However this only happens when my code is running on bluemix, locally when I reproduce the exception the next read() causes a rebalance.

What's the proper way to recover from a CommitFailedException? Should I close() and recreate my consumer? Or is calling read() supposed to rebalance and let me continue?

kyl
  • 475
  • 1
  • 5
  • 16
  • What version are you using as the heartbeat behavior was changed significantly in 0.10.x and higher – Hans Jespersen Jul 29 '17 at 03:13
  • We're using 0.10.x – kyl Jul 31 '17 at 21:30
  • 0.10.0 or 0.10.1 or 0.10.2? If 0.10.1 or higher what is your `max.poll.interval.ms` set to? I ask because of KIP-62 https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread – Hans Jespersen Aug 01 '17 at 01:27
  • I'm using compile 'org.apache.kafka:kafka-clients:0.10.2.1' specifically. – kyl Aug 01 '17 at 13:39
  • OK at that version you have both `max.poll.interval.ms` and `session.timeout.ms` to configure. The default values are: session.timeout.ms: = 10secs, max.poll.interval.ms = 5min, and also max.poll.records= 500 messages so you need to be calling poll() at least once every 5 minutes and you will be getting up to 500 messages to process in the next 5 minutes before you have to poll() again or your consumer will get kicked out of the consumer group. – Hans Jespersen Aug 01 '17 at 13:54
  • how long is the wait time in your poll() call? ```The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.``` – Hans Jespersen Aug 01 '17 at 14:10
  • You should not need to call commit to stay in the group. You only need to call poll(). Even if it returns 0 records it will still keep you in the consumer group. – Hans Jespersen Aug 01 '17 at 14:21
  • I call commit so that my offset is not deleted by the broker if I have no messages after 24 hours (I can't change the default 24 hr setting enforced by the message hub service). I call poll every 5 seconds if there's no messages, and if there is a message to process I even added a heartbeat thread to empty poll() just in case. So regardless of why I got kicked out, I want to have a plan in place to reconnect, but don't know what the proper way to reconnect a consumer is, with a close() and recreation, or if just calling poll() is sufficient – kyl Aug 03 '17 at 20:09
  • So there really is two questions to answer. 1) if you are getting a commit failed exception, what is the right way to recover 2) why (or are) you dropping out of the group in the first place. Both are important to answer because if you call poll so frequently and params are set correctly then your consumer should not be ever kicked out. If you only answer question #1 you will be joining and leaving a group repeatedly and causing rebalancing over and over which is just bad practice for Kafka. However I will post the answer to #1 as a first step – Hans Jespersen Aug 03 '17 at 20:19
  • Yes you are exactly right. I want to know about 1) so that I can have my prod deployment recover, while I try to figure out why 2) happens. Thanks for all your help so far :) – kyl Aug 08 '17 at 19:33

2 Answers2

1

@kyl so I believe with the default kafka-java client, the consumer will heartbeat every 3s and the session timeout is 10s, so your consumer should stay within the group without being taken out and a rebalance occurring. What was the message included with your CommitFailedException? I'm assuming the commit is failing because you've been kicked out.

a few other questions on this:

  1. do you have multiple consumers coming & going, and/or are you intentionally wanting to use consumer groups rather than just a single consumer?

  2. what do you mean by "my poll interval is set to hours"?

  3. what do you mean by "committing on an empty queue" ?

Can you share a snippet of your consumer loop code, as that might help better explain what you're doing

Dominic Evans
  • 370
  • 1
  • 4
  • The code might be a bit complex, but let me answer your questions first. The message was that my poll interval was too long. 1) Multiple consumers that stay up, used to process messages concurrently. 2) max.poll.interval.ms=1800000 3) I call commit() even when a message is not read to keep my offset "alive" in the broker. Regardless of why I got a rebalance, I still need to handle commit failed and I want to know the proper way to start "re-listening" to a broker with a consumer that got this error. Thanks – kyl Jul 28 '17 at 17:49
1

The commitSync method will automatically retry indefinitely so if you are getting CommitFailedException then it's not a retriable condition and calling commit again is not likely to help. You are getting this exception because your consumer has been kicked out of the consumer group.

If you were using commitAsync to commit offsets then retries are not automatic and you may receive a RetriableCommitFailedException to indicate a potentially transient error for which you can manually retry the commit again. Sounds like this is not what is happening in your case but I include it for completeness of this answer.

Once your consumer is kicked out of the group and you get this CommitFailedException exception you can just keep calling poll() until the rebalancing is complete and you are admitted back into the consumer group (potentially with a new set of partitions than before) and it will continue.

If your application is not tolerant to the condition in which the partitions (and therefore the keys) you are receiving change mid-stream then you should implement a rebalance listener which will be called upon change of partition assignment. See http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

If you are just trying to work around the fact that offsets expire every 24hours then you will need to call commit at least once per day to keep the offset up to date in addition to calling poll() periodically to stay in the consumer group

Hans Jespersen
  • 8,024
  • 1
  • 24
  • 31
  • Thank you. I do call poll() after being kicked out, however this is just part of the normal logic and it calls commit() after. Locally this works fine, on deployment it seems like sometimes the poll() does not trigger the rebalance and I keep getting the error since I call commit again(). Do you know if one poll() is supposed to be sufficient or if I should keep it in some sort of loop and then detect the rebalance? – kyl Aug 08 '17 at 19:40
  • It's more likely that the rebalance is just taking longer on Bluemix than it does locally. There will be a rebalance when your consumer is kicked out, and another one when the consumer rejoins. You may need to wait until the first leaving rebalance finishes before to can trigger the second rejoining. I know in 0.11 there are some improvements to add a delay to allow multiple consumers to join at once rather than having to do a bunch of joins and rebalances in series. – Hans Jespersen Aug 08 '17 at 19:53
  • Looked into this more. You should not need to wait for the rebalance to finish. Just call poll() and the group coordinator will trigger the right rebalancing needed to get the consumer back into the group. – Hans Jespersen Aug 08 '17 at 20:39
  • Ok, thanks. It seems my specific case is very weird then, since I do call poll() after being kicked, but it doesn't seem to trigger a rebalance and it continues on and commits() (which obviously fails). It might be some weird bluemix issue since they were having network problems before. I do also have a rebalancelistener, where I read my offsets from another storage in onPartitionsAssigned and I do nothing in onPartitionsRevoked - could that be an issue? When I read the docs it seems like onPartitionsRevoked is used to save offsets, and is not to "stop" the consumer. – kyl Aug 08 '17 at 21:16
  • Sounds like you are doing all the right things. Maybe network problems are causing your consumer to get kicked out because it's missing 10 seconds of heartbeats. Try increasing that interval and see if it helps. – Hans Jespersen Aug 09 '17 at 01:59