16

I have configured several Kafka consumers in Spring Boot. This is what the kafka.properties looks like (only listing config for one consumer here):

kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=

Here is the config:

@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, String> pindropConsumerFactory() {
        Map<String, Object> dataRiverProps = new HashMap<>();

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pindropConsumerFactory());
        return factory;
    }
}

And this is the consumer:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;


    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeJson(String message) {
        // processing message
    }
}

Is there a way for me to use the prop "kafka.enabled" so that I can control the creation or maybe the message retrieval of this consumer? Thanks so much!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Hua
  • 666
  • 2
  • 9
  • 21

2 Answers2

35

You can do it by using property autoStartup (true/false) in consumer like below -

@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
    //System.out.println("Consumed message: " + message);
}
sameer.nuna
  • 454
  • 1
  • 5
  • 6
  • 6
    `autoStartup` is available since `spring-kafka:2.2` – mohitmayank May 16 '19 at 05:29
  • 2
    @Bean public ConcurrentKafkaListenerContainerFactory kafkaListener(){ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setAutoStartup(false); return factory; } – Reza Jul 06 '20 at 08:59
  • 4
    I personally was looking for how to do this at runtime to an already enabled KafkaListener and found this helpful: https://github.com/spring-projects/spring-kafka/issues/938 – buddyp450 Nov 04 '20 at 16:25
11

To disable Kafka configuration you can, for example:

  1. Annotate KafkaConsumerConfig with

    @ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)

  2. Remove @Component on KafkaConsumer class and define it as @Bean in KafkaConsumerConfig.

To control message retrieval in KafkaConsumer:

  1. Just get property value inside KafkaConsumer @Value("kafka.enabled") private Boolean enabled;

  2. And then use simple if in method annotated with @KafkaListener.

Community
  • 1
  • 1
Oleksii Zghurskyi
  • 3,967
  • 1
  • 16
  • 17
  • Sorry for my late reply. I have been on the road on a business trip and things are going crazy... I would like to go with option 1. But I am not following point 2 in your option 1. Could you be a bit more specific? Thanks so much! – Hua Jan 30 '19 at 21:30
  • I think he means adding a property to your config and then read it in the method where you consume messages and check: if(enabled) { consume message } I am currently facing similar problem of conditional enabling kafka consumer and will opt for the first solution – Iwavenice Jan 31 '19 at 08:30
  • Hi @Hua! You can find the answer [here](https://github.com/zghurskyi/kafka-troubleshooting). Point 2 is not strictly required - it's basically targeted at keeping Spring context clean. So when Kafka is disabled no beans are created that depend somehow on Kafka. – Oleksii Zghurskyi Jan 31 '19 at 13:27
  • @Zgurskyi Thanks so much for your help! I went back to my original post and could see I did not make it clearly enough. My case is that I have multiple consumers, and would like to control the creation of each individually -- some on and others off based on a config file. Your example shows how to control the creation of all consumers. But I think it's a good example, and I will see it the annotation works on individual consumers! Thanks again! – Hua Jan 31 '19 at 17:05
  • 1
    @OleksiiZghurskyi: the link you are providing points to '404' (page not found). Would be great to provide the intended link – Hubert Schumacher Feb 01 '20 at 04:57
  • What is `kafka.enabled` property? Why just not exclude `KafkaAutoConfiguration`? – Edgar Asatryan Jun 04 '23 at 08:55