0

I am reading a kafka topic from the beginning. In order to use seekToBeginning() I first need to do a dummy call to poll(). Following is snippet of my code:

    // Subscribe
    consumer.subscribe(Collections.singleton(TOPIC_NAME));
    // Seek to beginning
    // consumer.poll(Duration.ZERO);
    consumer.poll(0);
    consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));

Using consumer.poll(0) works fine. When I use consumer.poll(Duration.ZERO) results in following exception:

[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1, groupId=group2] Seeking to EARLIEST offset of partition test-lc-1-0
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-lc-1-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
    at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
    at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)

I looked at the implementation of both the API. At the end both the api end up calling the same method with 0 as the argument. Any idea why poll(Duration.ZERO) would fail?

Thank you, Ahmed.

Ahmed A
  • 3,362
  • 7
  • 39
  • 57
  • Duration.ZERO is exactly same as poll(0), so the failure is because consumer didn't fetch the partition assignment meta data, because the poll is too fast and ahead, increase 0 – clevertension Aug 22 '20 at 10:31

1 Answers1

2

The correct way to seek when starting a consumer is to use a ConsumerRebalanceListener.

For example, something like:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    consumer.subscribe(Collections.singleton(TOPIC_NAME), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            consumer.seekToBeginning(partitions);
        }
    });
    while (true) {
        consumer.poll(Duration.ofSeconds(1L));
        ...
    }
}
Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
  • What is the correct way to seek while starting a consumer using `assign()`? – tuk May 02 '23 at 13:21
  • I have asked the same as a separate question [here](https://stackoverflow.com/q/76156027/785523) – tuk May 02 '23 at 14:40