I am reading a kafka topic from the beginning. In order to use seekToBeginning()
I first need to do a dummy call to poll(). Following is snippet of my code:
// Subscribe
consumer.subscribe(Collections.singleton(TOPIC_NAME));
// Seek to beginning
// consumer.poll(Duration.ZERO);
consumer.poll(0);
consumer.seekToBeginning(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));
Using consumer.poll(0)
works fine. When I use consumer.poll(Duration.ZERO)
results in following exception:
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group2-1, groupId=group2] Seeking to EARLIEST offset of partition test-lc-1-0
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-lc-1-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:615)
at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4797)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:613)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1659)
at com.ahmed.ConsumeProtobuf.main(ConsumeProtobuf.java:49)
I looked at the implementation of both the API. At the end both the api end up calling the same method with 0
as the argument. Any idea why poll(Duration.ZERO) would fail?
Thank you, Ahmed.