0

I have read this question and also this. But I am not able to find any discussion on how to determine a consumer is ready while starting a consumer using assign()

Now my code looks something like below

List<TopicPartition> topicPartitions = List.of(new TopicPartition("topic", 1));
// auto.offset.reset = latest
consumer.assign(partitions);
// first poll to initiate connection as mentioned in https://stackoverflow.com/a/54336476/785523
consumer.poll(Duration.ofSeconds(1)); 
boolean keepOnReading = true;
while(keepOnReading){
    final var records = consumer.poll(Duration.ofSeconds(1));
    // process records and stop polling by setting keepOnReading to false
}

But I am observing that if I make the first poll to 100ms (consumer.poll(Duration.ofMillis(100)) ) then consumer.poll() on the while loop does not get any records in spite of record being published continuously. Can someone let me know what is the reliable way to determine if a consumer is ready using assign().

Also in case of assign what is the correct way to do a seek after starting a consumer?

  • Kafka Client Version - 3.4.0
tuk
  • 5,941
  • 14
  • 79
  • 162

1 Answers1

0

You're ignoring the first set of records on the first poll. If there's no data afterwards, then no, the while loop poll shouldn't get any records.

So, you'll want to capture the first offset, then seek the consumer back, or call your process code before the while loop. Or use a do-while loop instead.

Regarding "ready" - if you're not getting a network or WakeupException, then it's "ready"

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I have updated the question . Records are being published continuously. – tuk May 03 '23 at 18:18
  • The problem of the while loop not getting records seems to be due to `fetch.max.wait.ms` is set to the default value of 500 ms and if I set poll timeout less than that poll does not get records. – tuk May 03 '23 at 18:20
  • Sounds like a network problem. Hard to say without inspecting full TCP traffic. Code similar to what you've written has worked for me before – OneCricketeer May 04 '23 at 02:20
  • But still, `consumer.poll()` returns a list of records. If you increase that duration, you'll get `max.poll.records` sized iterator. And you're not processing those. Therefore, if the producer is also continuous, you're skipping data, but that doesn't mean anything for the while loop, so I'm not sure about that. You'll have to print out the offsets of the records in the previous poll to see if you've reached end of the topic. There's also no point of the loop if you set the boolean to false on the first iteration – OneCricketeer May 04 '23 at 02:28