2

We're using consumer kafka client 0.10.2.0 with the following configuration:

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

So as you can see we're using autocommit. The consumer API version that we're using has a dedicated thread for doing autocommit. So every one second we have an autocommit which means that we have an heartbeat every one second.

Our application processing time may actually take(from time to time) more than 40 seconds (the request time out interval)

What I wanted to ask is:

1 - if the processing time will take , for example , a minute . will there be a rebalance although there is the autocommit heartbean every second?

2 - What more weird is that in case of long execution time it seems that we're getting the same message more than once. Is it normal? If the consumer has committed an offset , why the rebalance make the same offset being used again?

Thanks, Orel

Orel
  • 43
  • 7

3 Answers3

0

Just to clarify , AutoCommit check is called in every poll and it checks that the time elapsed is greater than configured time ,if yes then only it does the commit

Eg. if commit interval is 5 secs and poll is happening in 7 secs, In this case , the commit will happen after 7 sec

For your questions

  1. Auto commit doesn't count for heartbeat , if there is long processing time then obviously commit will not happen and will lead to session timeout which in-turn triggers rebalance

  2. This shouldn't happen unless you are seeking/resetting the offset to previously committed offset or the consumer rebalance occurred

Liju John
  • 1,749
  • 16
  • 19
  • Thanks for the response John! Actually we're using Kafka client 0.10.2.0 which has a dedicated thread for the autocommit . So our autocommit interval is set for 1 second and indeed we're seeing in our logs " Completed auto-commit of offsets {-2=OffsetAndMetadata{offset=2214, metadata=''}} for group..." every one second – Orel Feb 12 '18 at 08:33
  • regarding the heartbeat point, I know that commit is consider as heartbeat so I thought that autocommit will considered the same – Orel Feb 12 '18 at 08:41
0

From Kafka v0.10.1.0, you don't need to manually trigger auto commit to do heart beat. Kafka consumer itself initiates a new thread for heart-beat mechanism in background. To know more, read KIP-62.

In your case, you can set max.poll.interval.ms to the maximum time taken by your processor to handle the max.poll.record records.

Kamal Chandraprakash
  • 1,872
  • 18
  • 28
0

You can use KafkaConsumer.pause() / KafkaConsumer.resume() to prevent consumer re-balancing during long processing pauses. JavaDocs. Take a look at this question.

Re.2. Are you sure that these offsets are commited?

Arek
  • 3,106
  • 3
  • 23
  • 32