18

I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?

bachrc
  • 1,106
  • 1
  • 12
  • 20

5 Answers5

34

Because I didn't saw any example of this, I'm gonna explain how I did here.

The class of your @KafkaListener must implement a ConsumerSeekAware class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.

bachrc
  • 1,106
  • 1
  • 12
  • 20
6

Another way, we can always consume lastest message without commit group offset , by specify properties value with {"enable.auto.commit:false", "auto.offset.reset:latest"} for KafkaListener annotation.

@KafkaListener(id = "example-group",
        properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
        topics = "example")
bovenson
  • 1,170
  • 8
  • 6
2

You can set a ConsumerRebalanceListener for the kafka consumer while you subscribing to some topics,in which you can get the lastest offset of each partition by KafkaConsumer.endOffsets() method, and set this to consumer by KafkaConsumer.seek() method ,like this:

kafkaConsumer.subscribe(Collections.singletonList(topics),
    new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            //get and set the lastest offset for each partiton
            kafkaConsumer.endOffsets(partitions) 
                .forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
        }
    }
);
Suraj Rao
  • 29,388
  • 11
  • 94
  • 103
A.Chinese
  • 133
  • 11
1

For a new consumer group that doesn't have an initial offset in kafka, you can set AUTO_OFFSET_RESET_CONFIG:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

For an existing consumer group, you can:

  1. Change group id to appear as new i.e. consumer-group-id-v2
  2. Implement ConsumerSeekAware so you can seek to desired offset during initialization See docs
Ahmad Abdelghany
  • 11,983
  • 5
  • 41
  • 36
0

you can use partitionOffsets annotation to start with exact offset,for example:

@KafkaListener(id = "bar", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })public void listen(ConsumerRecord<?, ?> record) {
     }
link
  • 57
  • 7
  • 1
    Since Kafka topics usually have retention times associated this approach might fail as offset "100" might not exist (since the brokers will delete data after X days). The docs aren't too clear what happens if the offset does not exist. – PragmaticProgrammer Jul 17 '19 at 18:17