1

I want to start consuming from beginning of the topic. I have set the property "AUTO_OFFSET_RESET_CONFIG" to earliest but it somehow still not reading from beginning.

Any thoughts if I missing anything? I am creating a new consumer group every time.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);

    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getConsumerConfigs(false));
}

private Map<String, Object> getConsumerConfigs(boolean isEmbedded) {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, isEmbedded ? embeddedBootstrapServers : bootstrapServers);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId + "temp");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

       props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMillis);
       props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMillis);

       return props;
}
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
itontips
  • 361
  • 4
  • 12
  • Possible duplicate of [How to read data using Kafka Consumer API from beginning?](https://stackoverflow.com/questions/28561147/how-to-read-data-using-kafka-consumer-api-from-beginning) – Valentin Michalak Feb 01 '18 at 17:58
  • my question is specific to spring-kafka implementation, the link provided is generic kafka implementation so probably not. Thanks for sharing link though. – itontips Feb 02 '18 at 13:34

1 Answers1

0

That property only applies to new consumer groups that have never consumed. Use a ConsumerSeekAware message listener and you can call seekToBeginning for each assigned topic/partition.

akokskis
  • 1,486
  • 15
  • 32
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks Garry, I am trying to implement ConsumerSeekAware interface, I am using 1.2.2 release version, which does have only seek() method which is fine. Somehow it is not calling the seek() method on my Listener. I am using BatchListener. Does it work with all kind of listeners ? Any idea? Because, in KafkaMessageListenerContainer it calls if it is GenericListener only `if (this.genericListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this); }` – itontips Feb 02 '18 at 13:27
  • It should work; it was a problem before 1.2.2 [fixed here](https://github.com/spring-projects/spring-kafka/pull/324). – Gary Russell Feb 02 '18 at 15:28