I have read this question and also this. But I am not able to find any discussion on how to determine a consumer is ready while starting a consumer using assign()
Now my code looks something like below
List<TopicPartition> topicPartitions = List.of(new TopicPartition("topic", 1));
// auto.offset.reset = latest
consumer.assign(partitions);
// first poll to initiate connection as mentioned in https://stackoverflow.com/a/54336476/785523
consumer.poll(Duration.ofSeconds(1));
boolean keepOnReading = true;
while(keepOnReading){
final var records = consumer.poll(Duration.ofSeconds(1));
// process records and stop polling by setting keepOnReading to false
}
But I am observing that if I make the first poll to 100ms (consumer.poll(Duration.ofMillis(100))
) then consumer.poll()
on the while loop does not get any records in spite of record being published continuously. Can someone let me know what is the reliable way to determine if a consumer is ready using assign()
.
Also in case of assign
what is the correct way to do a seek after starting a consumer?
- Kafka Client Version - 3.4.0