1

I read the docs on using the pause and resume methods for a kafka consumer, and they seem easy enough to implement. However, do I need another thread to continue calling the poll() method while paused to meet the heartbeat requirements and not trigger a rebalance?

My consumer is running SQL scripts after polling the topic and depending the messages returned, the scripts may take longer than the current session.timeout.ms interval (we have increased this value, but the length of time for the scripts to run can vary quiet a bit and regardless of the interval we will exceed it at times). I also want to avoid a rebalance as safe ordering and data integrity are more important than throughput and error detention.

H.Ç.T
  • 3,335
  • 1
  • 18
  • 37
Bros0912
  • 75
  • 1
  • 4
  • why not reduce the batch size of consumer ? so that it doesn't take long time – Ryuzaki L Feb 06 '20 at 18:59
  • We tested with varying batch sizes and with the amount of processing we need to do for a single message we still run the risk of timing out (think car dealerships with hundreds of vehicles and associated coverages) – Bros0912 Feb 06 '20 at 19:09

2 Answers2

0

From version 0.10.1.0 heartbeat is sent via a separate thread so pausing your process thread wouldn't affect heartbeat thread.

You can check this for more information.

H.Ç.T
  • 3,335
  • 1
  • 18
  • 37
  • this answer is wrong. if you stop calling poll your consumer will eventually leave the consumer group (its described in the link posted in this answer) – radai Feb 08 '20 at 07:04
  • @radai I didn't say the opposite. You should read both question and the answer carefully. – H.Ç.T Feb 08 '20 at 07:15
  • the question reads `do I need another thread to continue calling the poll() method while paused to meet the heartbeat requirements`. the answer is yes - poll needs to continue to be called. the fact that heartbeat is sent by a separate thread does not mean `pausing your process thread wouldn't affect heartbeat thread` as you said - the heartbeat thread will initiate a group leave if poll isnt called often enough. – radai Feb 08 '20 at 07:24
  • 1
    @radai I see your point, but still there is no need to have **another thread** to continue calling poll() to send heartbeat. There is already a processing thread which continues to call poll as usual. Heartbeat is another mechanism and pausing partitions doesn't affect heartbeat to send. As you see from question there is a misunderstanding about calling poll and sending heartbeat (it is assumed that there is one thread which is responsible to poll and send heartbeat) and in my point of view providing information that heartbeat thread is a separate thread is enough as an answer of this question – H.Ç.T Feb 08 '20 at 10:33
  • thats true - it doesnt matter which thread calls poll() (as long as only one thread at a time does so). all that matters is that poll must still be called on a regular basis, even on consumers where all assigned partitions are paused. – radai Feb 08 '20 at 17:48
  • To add some clarity - my process calls poll() and does the batch processing in the same thread. If I pause the consumer to allow for long batch times, I will not be calling the poll method until the batch is completed. Consider this scenario: I call poll() and return 1000k messages, I pause the consumer and begin processing, 10 minutes later batch is done - no call to poll() took place in this time, my timeout is 5 minutes, so the long batch caused a rebalance. **do I need to create a thread to call poll() at the time consumer is paused to avoid a rebalance?** – Bros0912 Feb 10 '20 at 14:17
  • @Bros0912 As long as you call poll() without expiring `max.poll.interval.ms`, long batch processes won't be a problem for heartbeat thread. Even your batch process takes time longer than `session.timeout.ms`. Because heartbeat is a separate thread. BTW I couldn't really understand what you mean by "pause the consumer to allow for long batch times". Is it different than this: https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause-java.util.Collection- – H.Ç.T Feb 10 '20 at 14:35
  • @Bros0912 in your scenario I suggest you to increase max.poll.interval.ms or reduce max.poll.records to avoid rebalance. – H.Ç.T Feb 10 '20 at 14:39
  • @Bros0912 You can consider to check this link to make things more clear: https://stackoverflow.com/a/60052771/10550738 – H.Ç.T Feb 10 '20 at 14:58
  • 1
    @H.C.T we don't want to increase the max.poll.interval.ms just to handle long processing batches as this would also prevent timely error detection in case of some exception. In addition, there are some cases when even a small batch size can run quite long. We previously tested using different batch sizes and need to set that at a minimum number to stay 'current' with data availability (current as defined by our business consumers). – Bros0912 Feb 10 '20 at 16:20
0

yes, you need to continue calling poll() on the consumer, even if you pause all partitions, or it will be kicked out of any consumer group its a member of and its assigned partitions will transfer to another consumer. as to which thread ends up calling poll - that doesnt matter (so long as only a single thread interacts with the consumer at a time)

quoting from kip-62:

max.poll.interval.ms. This config sets the maximum delay between client calls to poll(). When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request.

radai
  • 23,949
  • 10
  • 71
  • 115