40

I am using Spring Kafka first time and I am not able to use Acknowledgement.acknowledge() method for manual commit in my consumer code. please let me know if anything missing in my consumer configuration or listener code. or else is there other way to handle acknowledge offset based on condition. Here i'm looking solution like if the offset is not committed/ acknowledge manually, it should pick same message/offset by consumer.

Configuration

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
                props));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
}

Listener

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    }
    value++;
}
Anirudh
  • 2,286
  • 4
  • 38
  • 64
user2550140
  • 401
  • 1
  • 4
  • 3
  • What happens if we then switch the setting from `props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)` to `props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)` ? Does any of the other code need to change or will spring ignore `acknowledgment.acknowledge()`? – alex May 21 '20 at 20:48
  • Good question. I think, setting it to `true` completely bypasses the container config. See (SEE LINK IN CHROMIUM BASED BROWSERS) https://docs.spring.io/spring-kafka/reference/html/#committing-offsets:~:text=If%20the%20enable.auto.commit%20consumer%20property%20is%20true%2C%20Kafka%20auto%2Dcommits%20the%20offsets%20according%20to%20its%20configuration. Manually acking it wont make a difference. So yes, the code needs to be changed because it is useless. – Boss Man Sep 29 '22 at 18:48

5 Answers5

57

Set the enable-auto-commit property to false:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Set the ack-mode to MANUAL_IMMEDIATE:

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Then, in your consumer/listener code, you can commit the offset manually, like this:

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}

Update: I created a small POC for this. Check it out here, might help you.

contactsunny
  • 761
  • 7
  • 11
  • 6
    For Spring 2.3.3 the config is factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); – Christian Altamirano Ayala Dec 02 '19 at 20:36
  • i m trying to commit a single record and unfortunately is commiting previous records or next records , just a single record isn't doing, do you have an idea what approach to take ? – Tiago Medici Jan 09 '20 at 14:59
18

You need to do the following

1) Set enable-auto-commit property to false

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

2) Set the ACK Mode to MANUL_IMMEDIATE

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

3) For processed records you need to call acknowledgment.acknowledge();

4) for failed records call acknowledgment.nack(10); Note: the nack method takes a long parameter which is the sleep time and it should be less than max.poll.interval.ms

Below is a sample code

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    } else {
        acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
    }
    value++;
}
Amit Nargund
  • 216
  • 2
  • 3
3

You can do following:
1. store the current record offset to file or DB.
2. Implement your kafka listener class with ConsumerAware.
3. Call registerSeekCallback as given below:

(registerSeekCallback(ConsumerSeekCallback callback) 
      {
      callback.seek(topic, partition, offset)
}

So when the consumer goes down or new consumer is assigned , it start reading fomr the offset stored in your DB.

Nishu Tayal
  • 20,106
  • 8
  • 49
  • 101
Zubair A.
  • 71
  • 3
2

That doesn't work that way in Apache Kafka.

For the currently running consumer we may never worry about committing offsets. We need them persisted only for new consumers in the same consumer group. The current one track its offset in the memory. I guess somewhere on Broker.

If you need to refetch the same message in the same consumer maybe the next poll round, you should consider to use seek() functionality: https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
0

I have a small nitpick with OP's configuration. If using @KafkaListener with ConcurrentKafkaListenerContainerFactory, dont forget to make the state thread-safe. Using private static int value = 1; isnt going to help. Use AtomicInteger

Boss Man
  • 587
  • 2
  • 12